Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fairness problem with semaphore acquire #4612

Closed
scullionw opened this issue Apr 11, 2022 · 12 comments · Fixed by #4624
Closed

Fairness problem with semaphore acquire #4612

scullionw opened this issue Apr 11, 2022 · 12 comments · Fixed by #4624
Labels
A-tokio Area: The main tokio crate C-bug Category: This is a bug. E-easy Call for participation: Experience needed to fix: Easy / not much E-help-wanted Call for participation: Help is requested to fix this issue. M-coop Module: tokio/coop M-sync Module: tokio/sync

Comments

@scullionw
Copy link

scullionw commented Apr 11, 2022

└── tokio v1.17.0
└── tokio-macros v1.7.0 (proc-macro)

M1 MBP
21.4.0 Darwin Kernel Version 21.4.0: Fri Mar 18 00:46:32 PDT 2022; root:xnu-8020.101.4~15/RELEASE_ARM64_T6000 arm64

Description
I am having fairness problems in tokio. In the following code, the second task never runs.

    use std::{sync::Arc, time::Duration};
    use tokio::sync::Semaphore;

    #[tokio::main]
    async fn main() {
        let permits = Arc::new(Semaphore::new(10));

        tokio::join!(non_cooperative_task(permits), poor_little_task());
    }

    async fn non_cooperative_task(permits: Arc<Semaphore>) {
        loop {
            let permit = permits.clone().acquire_owned().await.unwrap();
            // uncommenting the following makes it work
            // tokio::time::sleep(Duration::from_millis(1)).await;
        }
    }

    async fn poor_little_task() {
        loop {
            tokio::time::sleep(Duration::from_secs(1)).await;
            println!("Hello!")
        }
    }

Both tasks await, but only the first task runs.

I would understand if the first task was a hot loop, or had blocking code, since I am using tokio::join (which runs concurrently, not in parallel) and not tokio::spawn. But like I said, I am awaiting in both tasks.

@scullionw scullionw added A-tokio Area: The main tokio crate C-bug Category: This is a bug. labels Apr 11, 2022
@Darksonn Darksonn added the M-sync Module: tokio/sync label Apr 11, 2022
@Darksonn
Copy link
Contributor

Thanks for reporting this. This is because acquire_owned does not use Tokio's coop system. I would be happy to accept a PR that fixes this.

@Darksonn Darksonn added E-help-wanted Call for participation: Help is requested to fix this issue. E-easy Call for participation: Experience needed to fix: Easy / not much labels Apr 14, 2022
@Darksonn
Copy link
Contributor

Fixing it would involve adding this line:

let coop = ready!(crate::coop::poll_proceed(cx));

then, if the acquire succeeds, you should call this:

coop.made_progress();

@PoorlyDefinedBehaviour
Copy link
Contributor

PoorlyDefinedBehaviour commented Apr 14, 2022

I'll take this one if you don't mind @scullionw

@PoorlyDefinedBehaviour
Copy link
Contributor

PoorlyDefinedBehaviour commented Apr 16, 2022

I'm stuck so i thought i would get some opinions on this (hope i'm not too wrong):

What i understood from reading Reducing tail latencies with automatic cooperative task yielding is that each task has a budget.

I was taking a look at Semaphore::acquire_owned and it calls let inner = self.ll_sem.acquire(1); which returns Acquire

    pub async fn acquire_owned(self: Arc<Self>) -> Result<OwnedSemaphorePermit, AcquireError> {
        #[cfg(all(tokio_unstable, feature = "tracing"))]
        let inner = trace::async_op(
            || self.ll_sem.acquire(1),
            self.resource_span.clone(),
            "Semaphore::acquire_owned",
            "poll",
            true,
        );
        #[cfg(not(all(tokio_unstable, feature = "tracing")))]
        let inner = self.ll_sem.acquire(1);

        inner.await?;
        
        Ok(OwnedSemaphorePermit {
            sem: self,
            permits: 1,
        })
    }

The Future implementation for Acquire calls coop::poll_proceeed (so it means we don't need to add coop::poll_proceed right?)

impl Future for Acquire<'_> {
    type Output = Result<(), AcquireError>;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        // First, ensure the current task has enough budget to proceed.
        #[cfg(all(tokio_unstable, feature = "tracing"))]
        let coop = ready!(trace_poll_op!(
            "poll_acquire",
            crate::coop::poll_proceed(cx),
        ));

        #[cfg(not(all(tokio_unstable, feature = "tracing")))]
        let coop = ready!(crate::coop::poll_proceed(cx));
        ...
   }
}

The tokio::join! docs say that the async expressions are evaluated in the same task. (which means they share the task budget?)

What i think is happening

Since both async expressions run in the same task and they share the task budget, non_cooperative_task will consume the whole budget and by the time Sleep::poll is called, the budget will already be 0 making Sleep::poll return Poll::Pending.

I changed the order of the futures passed to tokio::join! and changed the println! to a panic! and it actually panics

use std::{sync::Arc, time::Duration};
use tokio::sync::Semaphore;

#[tokio::main]
async fn main() {
    let permits = Arc::new(Semaphore::new(10));
                  //  HERE \/
    tokio::join!(poor_little_task(), non_cooperative_task(permits));
}

async fn non_cooperative_task(permits: Arc<Semaphore>) {
    loop {
        let permit = permits.clone().acquire_owned().await.unwrap();
        // uncommenting the following makes it work
        // tokio::time::sleep(Duration::from_millis(1)).await;
    }
}

async fn poor_little_task() {
    loop {
        tokio::time::sleep(Duration::from_secs(1)).await;
        panic!("Hello!")
    }
}

I think this makes sense after looking at the expanded tokio::join!

#![feature(prelude_import)]
#[prelude_import]
use std::prelude::rust_2021::*;
#[macro_use]
extern crate std;
use std::{sync::Arc, time::Duration};
use tokio::sync::Semaphore;
fn main() {
    let body = async {
        let permits = Arc::new(Semaphore::new(10));
        {
            use ::tokio::macros::support::{maybe_done, poll_fn, Future, Pin};
            use ::tokio::macros::support::Poll::{Ready, Pending};
            let mut futures = (
                maybe_done(poor_little_task()),
                maybe_done(non_cooperative_task(permits)),
            );
            poll_fn(move |cx| {
                let mut is_pending = false;
                let (fut, ..) = &mut futures;
                let mut fut = unsafe { Pin::new_unchecked(fut) };
                if fut.poll(cx).is_pending() {
                    is_pending = true;
                }
                let (_, fut, ..) = &mut futures;
                let mut fut = unsafe { Pin::new_unchecked(fut) };
                if fut.poll(cx).is_pending() {
                    is_pending = true;
                }
                if is_pending {
                    Pending
                } else {
                    Ready((
                        {
                            let (fut, ..) = &mut futures;
                            let mut fut = unsafe { Pin::new_unchecked(fut) };
                            fut.take_output().expect("expected completed future")
                        },
                        {
                            let (_, fut, ..) = &mut futures;
                            let mut fut = unsafe { Pin::new_unchecked(fut) };
                            fut.take_output().expect("expected completed future")
                        },
                    ))
                }
            })
            .await
        };
    };
    #[allow(clippy::expect_used)]
    tokio::runtime::Builder::new_multi_thread()
        .enable_all()
        .build()
        .expect("Failed building the Runtime")
        .block_on(body);
}
async fn non_cooperative_task(permits: Arc<Semaphore>) {
    loop {
        let permit = permits.clone().acquire_owned().await.unwrap();
    }
}
async fn poor_little_task() {
    loop {
        tokio::time::sleep(Duration::from_secs(1)).await;
        ::core::panicking::panic_fmt(::core::fmt::Arguments::new_v1(&["Hello!"], &[]))
    }
}

If we go back to the original order that futures were passed to tokio::join! it does not panic

use std::{sync::Arc, time::Duration};
use tokio::sync::Semaphore;

#[tokio::main]
async fn main() {
    let permits = Arc::new(Semaphore::new(10));

    tokio::join!(non_cooperative_task(permits), poor_little_task());
}

async fn non_cooperative_task(permits: Arc<Semaphore>) {
    loop {
        let permit = permits.clone().acquire_owned().await.unwrap();
        // uncommenting the following makes it work
        // tokio::time::sleep(Duration::from_millis(1)).await;
    }
}

async fn poor_little_task() {
    loop {
        tokio::time::sleep(Duration::from_secs(1)).await;
        panic!("Hello!")
    }
}

@Darksonn
Copy link
Contributor

Ah yes, you are right. So the issue is really with tokio::join! and not the semaphore.

PoorlyDefinedBehaviour added a commit to PoorlyDefinedBehaviour/tokio that referenced this issue Apr 17, 2022
PoorlyDefinedBehaviour added a commit to PoorlyDefinedBehaviour/tokio that referenced this issue Apr 17, 2022
@PoorlyDefinedBehaviour
Copy link
Contributor

I got it working #4624

What do you think?

@Darksonn
Copy link
Contributor

Well, I am thinking that it would be better to just have the join! macro remember which one to poll next. It is not clear to me whether resetting the budget like that each time is a good idea.

@PoorlyDefinedBehaviour
Copy link
Contributor

Well, I am thinking that it would be better to just have the join! macro remember which one to poll next

I don't get it 🤔 What do you mean?

It is not clear to me whether resetting the budget like that each time is a good idea.

Just out of curiosity, does this create any problems?

@Darksonn
Copy link
Contributor

I mean, the second time the join! macro gets polled, start by polling the second future first so that the budget is not consumed.

As for whether it creates any issues, well, as far as I can tell with your implementation, this loop

loop {
    tokio::join!(sem.acquire());
}

would never run out of budget because the join! macro does not consume from the surrounding budget.

@PoorlyDefinedBehaviour
Copy link
Contributor

I mean, the second time the join! macro gets polled, start by polling the second future first so that the budget is not consumed.

Doesn't that mean that if i have two futures that consume resources that are always ready, the futures would stop making progress concurrently? Or we don't care about this case?

async fn always_ready(permits: Arc<Semaphore>) -> {
  loop {
    let _permit = permits.clone().acquire_owned().await.unwrap();
  }
}
let permits = Arc::new(Semaphore::new(10));
tokio::join!(/* call it A */ always_ready(Arc::clone(&permits)), /* call it B */ always_ready(permits));

poll poll_fn -> polls A (A consumes the whole budget and B makes no progress)
poll poll_fn -> polls B (B consumes the whole budget and A makes no progress)

@Darksonn
Copy link
Contributor

Your dual always_read example also doesn't work today, but it would also get fixed. First you poll A, then B. Then on the next poll you poll B, then A. Then you do A, then B. So they would both make progress.

@PoorlyDefinedBehaviour
Copy link
Contributor

PoorlyDefinedBehaviour commented Apr 24, 2022

I mean, the second time the join! macro gets polled, start by polling the second future first so that the budget is not consumed.

Updated my PR (#4624). I added 4 bytes to the size of the future generated by tokio::join! though 😅

If everything is ok, maybe we can use i8 instead of u32.

PoorlyDefinedBehaviour added a commit to PoorlyDefinedBehaviour/tokio that referenced this issue Apr 24, 2022
PoorlyDefinedBehaviour added a commit to PoorlyDefinedBehaviour/tokio that referenced this issue Apr 24, 2022
@Darksonn Darksonn added the M-coop Module: tokio/coop label Apr 24, 2022
PoorlyDefinedBehaviour added a commit to PoorlyDefinedBehaviour/tokio that referenced this issue Apr 24, 2022
PoorlyDefinedBehaviour added a commit to PoorlyDefinedBehaviour/tokio that referenced this issue Apr 24, 2022
PoorlyDefinedBehaviour added a commit to PoorlyDefinedBehaviour/tokio that referenced this issue Apr 24, 2022
PoorlyDefinedBehaviour added a commit to PoorlyDefinedBehaviour/tokio that referenced this issue Apr 24, 2022
PoorlyDefinedBehaviour added a commit to PoorlyDefinedBehaviour/tokio that referenced this issue Apr 24, 2022
PoorlyDefinedBehaviour added a commit to PoorlyDefinedBehaviour/tokio that referenced this issue Apr 24, 2022
PoorlyDefinedBehaviour added a commit to PoorlyDefinedBehaviour/tokio that referenced this issue Apr 24, 2022
PoorlyDefinedBehaviour added a commit to PoorlyDefinedBehaviour/tokio that referenced this issue Apr 24, 2022
PoorlyDefinedBehaviour added a commit to PoorlyDefinedBehaviour/tokio that referenced this issue Apr 24, 2022
PoorlyDefinedBehaviour added a commit to PoorlyDefinedBehaviour/tokio that referenced this issue Apr 24, 2022
PoorlyDefinedBehaviour added a commit to PoorlyDefinedBehaviour/tokio that referenced this issue Apr 24, 2022
PoorlyDefinedBehaviour added a commit to PoorlyDefinedBehaviour/tokio that referenced this issue Apr 24, 2022
PoorlyDefinedBehaviour added a commit to PoorlyDefinedBehaviour/tokio that referenced this issue Apr 24, 2022
PoorlyDefinedBehaviour added a commit to PoorlyDefinedBehaviour/tokio that referenced this issue Apr 24, 2022
PoorlyDefinedBehaviour added a commit to PoorlyDefinedBehaviour/tokio that referenced this issue Apr 24, 2022
PoorlyDefinedBehaviour added a commit to PoorlyDefinedBehaviour/tokio that referenced this issue Apr 24, 2022
PoorlyDefinedBehaviour added a commit to PoorlyDefinedBehaviour/tokio that referenced this issue Apr 24, 2022
PoorlyDefinedBehaviour added a commit to PoorlyDefinedBehaviour/tokio that referenced this issue Apr 24, 2022
PoorlyDefinedBehaviour added a commit to PoorlyDefinedBehaviour/tokio that referenced this issue Apr 25, 2022
PoorlyDefinedBehaviour added a commit to PoorlyDefinedBehaviour/tokio that referenced this issue Apr 25, 2022
PoorlyDefinedBehaviour added a commit to PoorlyDefinedBehaviour/tokio that referenced this issue Apr 25, 2022
PoorlyDefinedBehaviour added a commit to PoorlyDefinedBehaviour/tokio that referenced this issue Apr 25, 2022
PoorlyDefinedBehaviour added a commit to PoorlyDefinedBehaviour/tokio that referenced this issue Apr 25, 2022
PoorlyDefinedBehaviour added a commit to PoorlyDefinedBehaviour/tokio that referenced this issue May 1, 2022
PoorlyDefinedBehaviour added a commit to PoorlyDefinedBehaviour/tokio that referenced this issue May 1, 2022
PoorlyDefinedBehaviour added a commit to PoorlyDefinedBehaviour/tokio that referenced this issue May 1, 2022
PoorlyDefinedBehaviour added a commit to PoorlyDefinedBehaviour/tokio that referenced this issue May 1, 2022
PoorlyDefinedBehaviour added a commit to PoorlyDefinedBehaviour/tokio that referenced this issue May 1, 2022
PoorlyDefinedBehaviour added a commit to PoorlyDefinedBehaviour/tokio that referenced this issue May 2, 2022
PoorlyDefinedBehaviour added a commit to PoorlyDefinedBehaviour/tokio that referenced this issue May 3, 2022
PoorlyDefinedBehaviour added a commit to PoorlyDefinedBehaviour/tokio that referenced this issue May 3, 2022
PoorlyDefinedBehaviour added a commit to PoorlyDefinedBehaviour/tokio that referenced this issue May 5, 2022
Darksonn pushed a commit that referenced this issue May 7, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-tokio Area: The main tokio crate C-bug Category: This is a bug. E-easy Call for participation: Experience needed to fix: Easy / not much E-help-wanted Call for participation: Help is requested to fix this issue. M-coop Module: tokio/coop M-sync Module: tokio/sync
Projects
None yet
3 participants