Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Quadratic complexity in FuturesUnordered #2526

Closed
cvybhu opened this issue Nov 30, 2021 · 3 comments · Fixed by #2527
Closed

Quadratic complexity in FuturesUnordered #2526

cvybhu opened this issue Nov 30, 2021 · 3 comments · Fixed by #2527

Comments

@cvybhu
Copy link

cvybhu commented Nov 30, 2021

There might be a performance bug in FuturesUnordered.

Here's an example, which puts n async futures in FuturesUnordered and waits for all of them to finish.
Each task locks an async mutex and immediately frees it, which causes them to finish one by one.

It looks like the execution time is quadratic, each time n increases 2x, the time increases 4x.

n: 10000, time: 23ms
n: 20000, time: 89ms
n: 40000, time: 338ms
n: 80000, time: 1304ms
n: 160000, time: 5181ms

Documentation states:

Futures managed by FuturesUnordered will only be polled when they generate wake-up notifications.

and the implementation tries hard to avoid quadratic complexity, so it looks like a bug.

We ran into this issue when trying to perform many concurrent database queries In scylladb/scylla-rust-driver#362

main.rs:

use futures::stream::futures_unordered::FuturesUnordered;
use futures::stream::StreamExt;
use std::sync::Arc;
use tokio::sync::Mutex;

async fn do_task(mutex: Arc<Mutex<()>>) {
    mutex.lock().await;
}

async fn benchmark(n: usize) {
    let start_time: std::time::Instant = std::time::Instant::now();

    let mutex: Arc<Mutex<()>> = Arc::new(Mutex::new(()));
    let mutex_guard = mutex.lock().await;

    let mut futs = Vec::new();
    for _ in 0..n {
        futs.push(do_task(mutex.clone()));
    }
    let mut futs_unordered: FuturesUnordered<_> = futs.into_iter().collect();

    std::mem::drop(mutex_guard);

    for _ in 0..n {
        futs_unordered.select_next_some().await;
    }

    println!("n: {}, time: {}ms", n, start_time.elapsed().as_millis());
}

#[tokio::main]
async fn main() {
    for n in [10_000, 20_000, 40_000, 80_000, 160_000] {
        benchmark(n).await;
    }
}

Cargo.toml:

[package]
name = "futures-unordered-slow"
version = "0.1.0"
edition = "2021"

[dependencies]
tokio = { version = "=1.14.0", features = ["rt-multi-thread", "sync", "macros"] }
futures = "=0.3.18"

Doing this using tokio::spawn makes the time linear again:
main.rs

use std::sync::Arc;
use tokio::sync::Mutex;

async fn do_task(mutex: Arc<Mutex<()>>) {
    mutex.lock().await;
}

async fn benchmark(n: usize) {
    let start_time: std::time::Instant = std::time::Instant::now();

    let mutex: Arc<Mutex<()>> = Arc::new(Mutex::new(()));
    let mutex_guard = mutex.lock().await;

    let mut futs = Vec::new();
    for _ in 0..n {
        futs.push(tokio::spawn(do_task(mutex.clone())));
    }

    std::mem::drop(mutex_guard);

    for f in futs {
        f.await.unwrap();
    }

    println!("n: {}, time: {}ms", n, start_time.elapsed().as_millis());
}

#[tokio::main]
async fn main() {
    for n in [10_000, 20_000, 40_000, 80_000, 160_000] {
        benchmark(n).await;
    }
}
n: 10000, time: 6ms
n: 20000, time: 12ms
n: 40000, time: 25ms
n: 80000, time: 52ms
n: 160000, time: 104ms
@taiki-e
Copy link
Member

taiki-e commented Nov 30, 2021

This is due to the compatibility with the cooperative scheduling of tokio (used in Mutex).

This can be avoided by using tokio::task::unconstrained. For example:

  async fn main() {
      for n in [10_000, 20_000, 40_000, 80_000, 160_000] {
-         benchmark(n).await;
+         tokio::task::unconstrained(benchmark(n)).await;
      }
  }

Before:

n: 10000, time: 315ms
n: 20000, time: 1202ms
n: 40000, time: 4774ms
n: 80000, time: 18880ms
(n: 160000 is timeouted)

After

n: 10000, time: 25ms
n: 20000, time: 49ms
n: 40000, time: 94ms
n: 80000, time: 180ms
n: 160000, time: 350ms

See #2053 for more.

@cvybhu
Copy link
Author

cvybhu commented Nov 30, 2021

Ohh I didn't know tokio does that, so my understanding is:

  • tokio executor polls main, which keeps polling FuturesUnordered in a loop.
  • FuturesUnordered has a list of futures to poll, each one of them would be ready immediately because it locks a free mutex
  • But tokio cooperative scheduling means that after completing a few futures, the rest will report that they are pending and then immediately get rescheduled for polling.
  • FuturesUnordered goes through them anyway and then yields
  • On next poll the situation repeats - each time we complete only a few futures, but still go through the rest

thanks, wow that's dangerous.

Using tokio::task::unconstrained will cause us to block the reactor, so a good fix would probably be doing tokio::task::yield_now() after receiving each result from FuturesUnordered.

use futures::stream::futures_unordered::FuturesUnordered;
use futures::stream::StreamExt;
use std::sync::Arc;
use tokio::sync::Mutex;

async fn do_task(mutex: Arc<Mutex<()>>) {
    mutex.lock().await;
}

async fn benchmark(n: usize) {
    let start_time: std::time::Instant = std::time::Instant::now();

    let mutex: Arc<Mutex<()>> = Arc::new(Mutex::new(()));
    let mutex_guard = mutex.lock().await;

    let mut futs = Vec::new();
    for _ in 0..n {
        futs.push(do_task(mutex.clone()));
    }
    let mut futs_unordered: FuturesUnordered<_> = futs.into_iter().collect();

    std::mem::drop(mutex_guard);

    for _ in 0..n {
        futs_unordered.select_next_some().await;
        tokio::task::yield_now().await;
    }

    println!("n: {}, time: {}ms", n, start_time.elapsed().as_millis());
}

#[tokio::main]
async fn main() {
    for n in [10_000, 20_000, 40_000, 80_000, 160_000] {
        benchmark(n).await;
    }
}
n: 10000, time: 2ms
n: 20000, time: 5ms
n: 40000, time: 10ms
n: 80000, time: 20ms
n: 160000, time: 40ms

@taiki-e
Copy link
Member

taiki-e commented Nov 30, 2021

I found a way to mitigate this on the FuturesUnordered side: #2527

After #2527, the benchmark result became:

n: 10000, time: 29ms
n: 20000, time: 55ms
n: 40000, time: 104ms
n: 80000, time: 202ms
n: 160000, time: 392ms

A little slower than the version using unconstrained, but a little faster than the version using tokio::spawn, on my machine.
See that PR for more.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants