Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FuturesUnordered: Respect yielding from future #2551

Merged
merged 3 commits into from Feb 6, 2022

Conversation

taiki-e
Copy link
Member

@taiki-e taiki-e commented Jan 13, 2022

If the future yields, FuturesUnordered respects it and yields too.
In this patch, if the future waken before it finishes polling, we assume the future yields.

This is the approach suggested by @carllerche in discord (if I understand correctly).
And I noticed that stjepang had also suggested this approach before: rust-lang/rust#74335 (comment)

cc @cramertj @jonhoo
cc #2053


The following is a result of an example in #2526 with the addition of a counter for the number of times the futures were polled.

code
use futures::stream::futures_unordered::FuturesUnordered;
use futures::stream::StreamExt;
use futures_util::Future;
use std::pin::Pin;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use tokio::sync::Mutex;

static COUNT: AtomicUsize = AtomicUsize::new(0);

#[pin_project::pin_project]
struct PollCounter<F>(#[pin] F);

impl<F: Future> Future for PollCounter<F> {
    type Output = F::Output;
    fn poll(
        self: Pin<&mut Self>,
        cx: &mut futures_task::Context<'_>,
    ) -> futures_task::Poll<Self::Output> {
        COUNT.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
        self.project().0.poll(cx)
    }
}

async fn do_task(mutex: Arc<Mutex<()>>) {
    mutex.lock().await;
}

async fn benchmark(n: usize) {
    COUNT.store(0, std::sync::atomic::Ordering::SeqCst);
    let start_time: std::time::Instant = std::time::Instant::now();

    let mutex: Arc<Mutex<()>> = Arc::new(Mutex::new(()));
    let mutex_guard = mutex.lock().await;

    let mut futs = Vec::new();
    for _ in 0..n {
        futs.push(PollCounter(do_task(mutex.clone())));
    }
    let mut futs_unordered: FuturesUnordered<_> = futs.into_iter().collect();

    std::mem::drop(mutex_guard);

    for _ in 0..n {
        futs_unordered.select_next_some().await;
    }

    println!(
        "n: {}, time: {}ms, polled: {}",
        n,
        start_time.elapsed().as_millis(),
        COUNT.load(std::sync::atomic::Ordering::SeqCst)
    );
}

#[tokio::main]
async fn main() {
    for n in [10_000, 20_000, 40_000, 80_000, 160_000] {
        benchmark(n).await;
    }
}

before:

n: 10000, time: 6ms, polled: 12481
n: 20000, time: 13ms, polled: 24992
n: 40000, time: 25ms, polled: 49984
n: 80000, time: 50ms, polled: 100000
n: 160000, time: 96ms, polled: 200000

after:

n: 10000, time: 6ms, polled: 10078
n: 20000, time: 13ms, polled: 20156
n: 40000, time: 25ms, polled: 40312
n: 80000, time: 48ms, polled: 80625
n: 160000, time: 93ms, polled: 161250

@taiki-e taiki-e added 0.3-backport: pending The maintainer accepted to backport this to the 0.3 branch, but backport has not been done yet. A-stream Area: futures::stream labels Jan 13, 2022
Copy link
Contributor

@Darksonn Darksonn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems good to me.

Copy link
Contributor

@jonhoo jonhoo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm okay with the change as-is, but see my one concern around over-eager yielding.

futures-util/src/stream/futures_unordered/mod.rs Outdated Show resolved Hide resolved
futures-util/src/stream/futures_unordered/mod.rs Outdated Show resolved Hide resolved
futures-util/src/stream/futures_unordered/mod.rs Outdated Show resolved Hide resolved
// avoid starving other tasks waiting on the executor.
// (polling the same future twice per iteration may cause
// the problem: https://github.com/rust-lang/futures-rs/pull/2333)
if yielded || polled == len {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I worry a little about this being too eager to yield. We're saying here that if any future is awoken while we're polling it (even if it's due to something else legitimately waking it) then we're going to yield from the entire FuturesUnordered. I wonder if it makes more sense to require observing, say, two such yields before concluding we should yield? That buffers us slightly against yielding unnecessarily while still making us yield fairly quickly when everything inside of us will yield going forward.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I adopted this heuristic (FuturesUnordered needs to support no-std, which cannot use thread ID). Thanks for the suggestion.

@Darksonn
Copy link
Contributor

Another possible implementation would be to check if the thread ID in the waker is equal to the thread currently polling the FuturesUnordered, if any.

@Darksonn
Copy link
Contributor

Any updates on this?

taiki-e and others added 2 commits February 6, 2022 13:01
@taiki-e taiki-e force-pushed the taiki-e/futures-unordered branch 2 times, most recently from 3002f79 to 8136c37 Compare February 6, 2022 04:06
@taiki-e
Copy link
Member Author

taiki-e commented Feb 6, 2022

@jonhoo @Darksonn Thanks both of you for the reviews!

I've addressed all reviews, so this should be ready to merge now.

@taiki-e taiki-e merged commit 8bc9f1e into master Feb 6, 2022
@taiki-e taiki-e deleted the taiki-e/futures-unordered branch February 6, 2022 04:39
taiki-e added a commit that referenced this pull request Feb 6, 2022
Co-authored-by: Jon Gjengset <jon@thesquareplanet.com>
@taiki-e taiki-e mentioned this pull request Feb 6, 2022
@taiki-e taiki-e added 0.3-backport: completed and removed 0.3-backport: pending The maintainer accepted to backport this to the 0.3 branch, but backport has not been done yet. labels Feb 6, 2022
taiki-e added a commit that referenced this pull request Feb 6, 2022
Co-authored-by: Jon Gjengset <jon@thesquareplanet.com>
@taiki-e taiki-e mentioned this pull request Feb 6, 2022
@taiki-e
Copy link
Member Author

taiki-e commented Feb 6, 2022

Published in 0.3.20.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants