Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scalability issue visible when the number of concurrent requests > 128 #362

Open
pkolaczk opened this issue Nov 26, 2021 · 56 comments
Open
Assignees

Comments

@pkolaczk
Copy link

# on latte master branch (cassandra_cpp):
$ target/release/latte run workloads/basic/read.rn -d 1000000 -t 2 --tag cassandra_cpp

# on scylla_driver_rebase branch:
$ target/release/latte run workloads/basic/read.rn -d 1000000 -t 2 --tag scylla -b read.Test_Cluster.4.0.1-SNAPSHOT.cassandra_cpp.p384.t2.c1.20211126.163116.json


CONFIG ═════════════════════════════════════════════════════════════════════════════════════════════════════════════════════
                            ───────────── A ─────────────  ────────────── B ────────────     Change     
            Date            Fri, 26 Nov 2021               Fri, 26 Nov 2021                           
            Time            16:31:08 +0100                 16:33:03 +0100                             
         Cluster            Test Cluster                   Test Cluster                               
      C* version            4.0.1-SNAPSHOT                 4.0.1-SNAPSHOT                             
            Tags            cassandra_cpp                  scylla                                     
        Workload            read.rn                        read.rn                                    
────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
         Threads                    2                              2                          +0.0%     
     Connections                    1                              1                          +0.0%     
     Concurrency     [req]        384                            384                          +0.0%     
        Max rate    [op/s]                                                                            
          Warmup       [s]                                                                            
              └─      [op]          1                              1                          +0.0%     
        Run time       [s]                                                                            
              └─      [op]    1000000                        1000000                          +0.0%     
        Sampling       [s]        1.0                            1.0                          +0.0%     

LOG ════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════
    Time  ───── Throughput ─────  ────────────────────────────────── Response times [ms] ───────────────────────────────────
     [s]      [op/s]     [req/s]         Min        25        50        75        90        95        99      99.9       Max
   0.000      137155      137155       0.645     3.923     4.895     6.227     7.915     9.391    12.663    17.263    23.727
   1.000      140517      140517       0.637     3.773     4.735     6.151     7.703     9.055    12.271    21.007    40.799
   2.000      137920      137920       1.092     3.715     4.707     6.167     8.159     9.503    15.359    28.223    39.071
   3.000      135219      135219       0.964     3.863     4.879     6.223     8.003     9.655    16.335    28.127    33.567
   4.000      138897      138897       0.780     3.799     4.819     6.367     8.031     9.055    11.487    18.319    31.967
   5.000      133410      133410       0.331     3.843     5.011     6.575     8.327     9.751    14.055    20.511    27.663
   6.000      137751      137751       0.587     3.799     4.787     6.227     8.163     9.431    12.511    27.423    42.399
   7.000      144696      144696       0.914     3.909     4.827     6.055     7.519     8.287    10.071    11.375    23.007

SUMMARY STATS ══════════════════════════════════════════════════════════════════════════════════════════════════════════════
                            ───────────── A ─────────────  ────────────── B ────────────     Change     P-value  Signif.
    Elapsed time       [s]      6.942                          7.271                          +4.7%     
        CPU time       [s]     12.746                         12.985                          +1.9%     
 CPU utilisation       [%]       23.0                           22.3                          -2.7%     
           Calls      [op]    1000000                        1000000                          +0.0%     
          Errors      [op]          0                              0                          +0.0%     
              └─       [%]        0.0                            0.0                          +0.0%     
        Requests     [req]    1000000                        1000000                          +0.0%     
              └─  [req/op]        1.0                            1.0                          +0.0%     
            Rows     [row]     500425                         499581                          -0.2%     
              └─ [row/req]        0.5                            0.5                          -0.2%     
         Samples                    7                              8                         +14.3%     
Mean sample size      [op]     142857                         125000                         -12.5%     
              └─     [req]     142857                         125000                         -12.5%     
     Concurrency     [req]      297.8                          360.0                         +20.9%     
              └─       [%]       77.5                           93.8                         +20.9%     
      Throughput    [op/s]     144068 ± 2123                  137543 ± 3159                   -4.5%     0.00011  **   
              ├─   [req/s]     144068 ± 2123                  137543 ± 3159                   -4.5%     0.00011  **   
              └─   [row/s]      72095 ± 1229                   68714 ± 1363                   -4.7%     0.00004  ***  
  Mean call time      [ms]      4.627 ± 0.103                  5.333 ± 0.120                 +15.2%     0.00000  *****
 Mean resp. time      [ms]      4.622 ± 0.103                  5.330 ± 0.120                 +15.3%     0.00000  *****


RESPONSE TIMES [ms] ════════════════════════════════════════════════════════════════════════════════════════════════════════
                            ───────────── A ─────────────  ────────────── B ────────────     Change     P-value  Signif.
          Min                   0.344 ± 0.108                  0.728 ± 0.286                +111.7%     0.00260  *    
           25                   2.498 ± 0.090                  3.820 ± 0.079                 +52.9%     0.00000  *****
           50                   3.781 ± 0.107                  4.832 ± 0.117                 +27.8%     0.00000  *****
           75                   5.621 ± 0.189                  6.267 ± 0.175                 +11.5%     0.00000  **** 
           90                   8.105 ± 0.369                  8.021 ± 0.258                  -1.0%     0.55184       
           95                  10.405 ± 0.543                  9.359 ± 0.405                 -10.1%     0.00031  **   
           98                  14.847 ± 1.472                 11.281 ± 0.747                 -24.0%     0.00006  ***  
           99                  19.435 ± 3.130                 13.376 ± 2.164                 -31.2%     0.00028  **   
           99.9                28.216 ± 6.287                 22.519 ± 6.067                 -20.2%     0.05167       
           99.99               30.291 ± 3.934                 25.367 ± 6.235                 -16.3%     0.04922       
          Max                  34.476 ± 5.021                 33.786 ± 8.295                  -2.0%     0.81925       

So I ported current master of latte to scylladb driver, and I noticed a weird issue - minimum and average latencies are much higher than cassandra_cpp, and throughput is still a bit behind in this test (even though this test is short, this phenomenon exist also when running it for longer). I've run it a few times, also in reversed order and it is very repeatable. The difference is larger on single thread.

Are there any tuning options for the buffering thing you've added recently?

BTW: not sure why the number of returned rows is only half the number of queries, but that looks like a bug in the workload generation / definition; that's exactly same code on both sides, so shouldn't change the validity of this test.

@pkolaczk
Copy link
Author

Interestingly, when I decrease the amount of concurrent requests from 384 to only 64, scylla_db now outperforms cassandra_cpp on almost every metric (except min latency). I just wonder, maybe the internal driver queue / buffer is too small?

@pkolaczk pkolaczk changed the title More performance issues: min, p25, p50 and p75 latencies still higher than cassandra_cpp Weird performance behavior: min, p25, p50 and p75 latencies still higher than cassandra_cpp despite lower throughput Nov 26, 2021
@pkolaczk
Copy link
Author

BTW: I'm not saying these latencies from scylla are worse - actually in REAL app I'd take lower tail latency over larger min latency every time. However, I'm just curious why I'm getting those numbers. Maybe there is an opportunity to improve something even more.

@pkolaczk
Copy link
Author

Looking perf stats, one thing I notice is that scylla branch spends a lot more time in the system, and makes 2x more context switches:

master:

 Performance counter stats for './target/release/latte run workloads/basic/read.rn -d 1000000':

         13 215,82 msec task-clock                #    1,362 CPUs utilized          
            51 499      context-switches          #    3,897 K/sec                  
             1 432      cpu-migrations            #  108,355 /sec                   
             7 094      page-faults               #  536,781 /sec                   
    39 277 452 851      cycles                    #    2,972 GHz                    
    32 477 310 140      instructions              #    0,83  insn per cycle         
     6 607 178 073      branches                  #  499,945 M/sec                  
        70 631 696      branch-misses             #    1,07% of all branches        

       9,701747340 seconds time elapsed

      11,554137000 seconds user
       1,807549000 seconds sys

scylla-driver-rebase:

 Performance counter stats for './target/release/latte run workloads/basic/read.rn -d 1000000':

         13 170,58 msec task-clock                #    1,226 CPUs utilized          
           101 559      context-switches          #    7,711 K/sec                  
             4 966      cpu-migrations            #  377,052 /sec                   
             1 592      page-faults               #  120,875 /sec                   
    38 972 668 976      cycles                    #    2,959 GHz                    
    36 413 811 976      instructions              #    0,93  insn per cycle         
     6 182 693 111      branches                  #  469,432 M/sec                  
        53 918 503      branch-misses             #    0,87% of all branches        

      10,740003985 seconds time elapsed

      10,124568000 seconds user
       3,358842000 seconds sys

@pkolaczk
Copy link
Author

pkolaczk commented Nov 27, 2021

Both runs 1000000 iterations of read.rn benchmark, single thread:

scylla-driver-rebase:
           127 260      syscalls:sys_enter_recvfrom  
            55 854      syscalls:sys_enter_sendto                                   
            75 392      syscalls:sys_enter_epoll_wait                                   

master:
            62 662      syscalls:sys_enter_read                                     
            10 492      syscalls:sys_enter_write                                    
            16 730      syscalls:sys_enter_writev  
            45 434      syscalls:sys_enter_epoll_wait                                   

Althouth there is a huge improvement since 0.3.0, it looks like scylla driver still makes significantly more syscalls than cassandra_cpp. Looks like it splits data into smaller chunks and the system overhead is higher.

Can we have an option to configure scylla driver to make bigger batches when high throughput is more important than latency? Or when simply when a high number of concurrent requests are scheduled (many pending calls to Session::execute)?

I noticed there is a hardcoded buffer size of 8196 bytes in your code. I was wondering - does the size of that buffer affect batching behaviour? Maybe it would make sense to expose its size in the options?

BTW: On a fixed rate test I'm getting a lot better latencies from scylla driver, in particular p75-p99.9.

@psarna
Copy link
Contributor

psarna commented Nov 27, 2021

Indeed, increasing buffer sizes will increase throughput at the cost of latency, up to a point. The whole idea is to find a sweet spot for your workload and hardware, and you're right that it's not currently possible in scylla-rust-driver, because the sizes are hardcoded. I was planning to push a patch that makes the sizes configurable - I'll put it higher on my priority list and push a pull request early next week.

@psarna psarna self-assigned this Nov 27, 2021
@pkolaczk
Copy link
Author

Cool, let me quickly fork it then and check if changing that buffer size makes any difference first ;)

@psarna
Copy link
Contributor

psarna commented Nov 27, 2021

Sure. Btw, no need to fork the repo - once I send a pull request to a branch, you can specify a git branch as dependency in Cargo.toml: scylla = {git = "https://github.com/scylladb/scylla-rust-driver", branch="some-branch"}

@pkolaczk
Copy link
Author

I increased the buffers size to 32k and can't see any change in the numbers of recvfrom nor sendto calls. Huh.

@pkolaczk
Copy link
Author

What is the policy of flushing those buffers? Also what controls the frequency of reading from socket into the read buffer? Maybe it reads so frequently that there isn't enough data in the socket to read?

@pkolaczk
Copy link
Author

pkolaczk commented Nov 27, 2021

So I guess this is this is the main loop:

      while let Some(mut task) = task_receiver.recv().await {
            let mut num_requests = 0;
            let mut total_sent = 0;
            while let Some(stream_id) = Self::alloc_stream_id(handler_map, task.response_handler) {
                let mut req = task.serialized_request;
                req.set_stream(stream_id);
                let req_data: &[u8] = req.get_data();
                total_sent += req_data.len();
                num_requests += 1;
                write_half.write_all(req_data).await?;
                task = match task_receiver.try_recv() {
                    Ok(t) => t,
                    Err(_) => break,
                }
            }
            trace!("Sending {} requests; {} bytes", num_requests, total_sent);
            write_half.flush().await?;
        }

After enabling logging, I noticed this only initially sends a full buffer with requests, and then it quickly drops down to sending low numbers of requests:

Sending 128 requests; 5888 bytes
Sending 128 requests; 5888 bytes                                               
Sending 128 requests; 5888 bytes
Sending 40 requests; 1840 bytes
Sending 268 requests; 12328 bytes
Sending 75 requests; 3450 bytes
Sending 1 requests; 46 bytes
Sending 40 requests; 1840 bytes
Sending 1 requests; 46 bytes
Sending 1 requests; 46 bytes
Sending 2 requests; 92 bytes
Sending 1 requests; 46 bytes
Sending 2 requests; 92 bytes
Sending 1 requests; 46 bytes
Sending 1 requests; 46 bytes
Sending 1 requests; 46 bytes
Sending 384 requests; 17664 bytes
Sending 101 requests; 4646 bytes
Sending 231 requests; 10626 bytes
Sending 154 requests; 7084 bytes
Sending 1 requests; 46 bytes
Sending 1 requests; 46 bytes
Sending 53 requests; 2438 bytes
Sending 43 requests; 1978 bytes
Sending 1 requests; 46 bytes
Sending 1 requests; 46 bytes
Sending 1 requests; 46 bytes
Sending 1 requests; 46 bytes

The client app uses buffer_unordered to limit the number of pending request futures. So, once it hits that limit (384 futures by default), it stops sending more requests to the driver and waits for some of the pending ones to finish. Now after some time Cassandra responds and a query result arrives. Because all of this is so low-latency here on the Rust side, the driver picks the result near instantly and the number of pending request futures drops by just 1 or a few, but not by 384 (this is what I see in the logs) and the writer loops now gets to spin another cycle. But it only gets a few tasks to process, so it runs out of tasks long before the buffer fills up, and flushes whatever was buffered.

Hence, the buffer size does not matter that much - the driver is very "eager" - whatever comes it jumps to process it and doesn't let things accumulate.

BTW: This problem was realized by the creators of DataStax C++ driver and there is a configurable delay (default 200 µs), where the driver waits for more requests in order to fill up the buffer - so if the buffer doesn't fill up, it adds at worst those 200 µs to the latency, but saves many syscalls by potentially batching more queries at once.

If you wish I can try to implement such configurable delay if time permits this weekend (could be turned off by default so it wouldn't cause any regression for latency sensitive workloads).

@psarna
Copy link
Contributor

psarna commented Nov 27, 2021

Of course, all contributors are welcome. We'll probably want to preserve current behavior as default, but allowing the driver to explicitly accumulate the requests for X microseconds sounds like a very nice tuning option.

@pkolaczk
Copy link
Author

pkolaczk commented Nov 28, 2021

I've spent a bit of time today on this, and there is something weird going on, but I haven't figured that out yet. :(

Because there I couldn't find anything to do async sleep with submillisecond timing in tokio, my first attempt was to just spin on tokio::task::yield_now().await for ~200 µs or until I get the buffer filled up to at least 8k - well, that did reduce the number of write syscalls, but increased the number of epoll calls, and overall performance and cpu utilisation was beasically the same.

while let Some(mut task) = task_receiver.recv().await {
            let start_time = Instant::now();
            let mut num_requests = 0;
            let mut total_sent = 0;
            'outer: while let Some(stream_id) =
                Self::alloc_stream_id(handler_map, task.response_handler)
            {
                let mut req = task.serialized_request;
                req.set_stream(stream_id);
                let req_data: &[u8] = req.get_data();
                total_sent += req_data.len();
                num_requests += 1;
                write_half.write_all(req_data).await?;
                task = loop {
                    match task_receiver.try_recv() {
                        Ok(t) => break t,
                        Err(_) => {
                            if Instant::now() - start_time < tokio::time::Duration::from_micros(200) && total_sent < 8196
                            {
                                tokio::task::yield_now().await;
                            } else {
                                break 'outer;
                            }
                        }
                    }
                }
            }

Then I tried blocking with std::time::sleep, just for an experiment (I know it is totally not acceptable to do so - but I'm just breaking things to learn better how they work ;)), and this time it reduced the amount of syscalls significantly, it reduced a bit CPU usage, but the overall throughput stayed the same. Still couldn't match the throughput I am getting with cassandra_cpp.

So I conclude, maybe those syscalls are a red herring and the real issue is somewhere else (or there is no problem and I'm nitpicking and 100k max throughput vs 120k with 1 thread is not such a big difference, and with 4 threads it's only a tad slower).

However, I tried one more thing - what happens if I increase the number of concurrent queries to a crazy value, on the latte side. It is easy to do so with -p. And there I've found something big:

 ./target/release/latte run workloads/basic/read.rn -d 1000000 -p 4096

     Concurrency     [req]     4071.6                                                                 
              └─       [%]       99.4                                                                 
      Throughput    [op/s]      23979 ± 1508                                                          
              ├─   [req/s]      23979 ± 1508                                                          
              └─   [row/s]      23979 ± 1508  

Wait, what?! 24k req/s? 5x lower?
Interestingly, during this test my local cassandra node sits half idle, top reports Cassandra using about 160% of a core (out of 4 physical cores), and total CPU is about 60-70% idle.
The queue is full on the latte side, there are 4071 pending requests on average at the moment of scheduling a new request.

Let's try that with master branch using cassandra_cpp:

./target/release/latte run workloads/basic/read.rn -d 1000000 -p 4096

     Concurrency     [req]     1742.2                                                                 
              └─       [%]       42.5                                                                 
      Throughput    [op/s]     131797 ± 6877                                                          
              ├─   [req/s]     131797 ± 6877                                                          
              └─   [row/s]     131797 ± 6877         

Now Cassandra is also a lot more loaded, gets about 450-500% of a core, total idle is about 15-20% (still weird why, but it was always like that, so that might be a problem in Cassandra).

So overall - it looks like there is a bottleneck somewhere when a crazy number of futures is outstanding. Can you try with -p 4096 (or even higher values) and check if you also reproduce that? Generally the higher you go, the throughput drops more, while with cassandra_cpp the throughput first goes up, then it flattens, but does not drop - further increasing the number of concurrent requests only increases the latency.

@pkolaczk
Copy link
Author

Seems to be inversely linear. Going from 4096 to 8152 concurrent queries halves the throughput near exactly. I'm using single threaded tokio runtime here so it is likely not a classic thread contention.

@pkolaczk
Copy link
Author

pkolaczk commented Nov 28, 2021

I made a flamegraph and it is hugely confusing to me.
flamegraph.zip

First it doesn't show that scylla request takes a huge amount of time, at least not directly.
It shows lot of time being spent in.... Future.

Also when I change the workload to workloads/basic/empty.rn (on the same branch, no recompilation needed), which does not call into the driver at all at all, yet it invokes the Rune scripting engine and basically runs the same code path on the latte side – I'm getting 1.5 M calls per second, regardless of -p setting. 1.5 million vs 12k. Also, with cassandra_cpp there is no such behaviour.
So it kinda works as if polling scylla future made everything slow, but not scylla driver code itself.

I wonder, could the future returned form scylla execute be deeply nested - e.g. aren't we running into the problem described here: https://internals.rust-lang.org/t/questioning-the-design-of-the-future-trait-and-the-cost-implied-by-it/14217 ?

@pkolaczk
Copy link
Author

pkolaczk commented Nov 28, 2021

Ok, so I think I got it. Indeedt the driver must be running into a recursive Future issue somewhere.
I wrapped the call to session.execute into its own task and throughput is now a steady 115k req/s at -p 384, and it drops to 105k req/s at -p 8152.

Before (session.rs: 206):

        let rs = self.inner.execute(statement, params).await;

After:

        let session = self.inner.clone();
        let statement = statement.clone();
        let rs =
            tokio::task::spawn(async move { session.execute(statement.as_ref(), params).await })
                .await
                .unwrap();

Such a tiny change and a 10x performance difference.
I hope it can be fixed on the driver side though, as spawning new tasks is generally not cheap.

@pkolaczk pkolaczk changed the title Weird performance behavior: min, p25, p50 and p75 latencies still higher than cassandra_cpp despite lower throughput Scalability issue visible when the number of concurrent requests > 128 Nov 28, 2021
@psarna
Copy link
Contributor

psarna commented Nov 29, 2021

Very impressive detective work! I just ran a simple experiment and can confirm that increasing the parallelism causes reduced throughput after a sweet spot is reached. I wonder if this problem (https://internals.rust-lang.org/t/questioning-the-design-of-the-future-trait-and-the-cost-implied-by-it/14217) is even solvable without explicitly spawning a task, we'll definitely need to educate ourselves more with Rust and Tokio internals to try and bypass this. /cc @piodul

@psarna
Copy link
Contributor

psarna commented Nov 29, 2021

Very impressive detective work! I just ran a simple experiment and can confirm that increasing the parallelism causes reduced throughput after a sweet spot is reached. I wonder if this problem (https://internals.rust-lang.org/t/questioning-the-design-of-the-future-trait-and-the-cost-implied-by-it/14217) is even solvable without explicitly spawning a task

I mean, it obviously is solvable, but it would be great to find a way that doesn't involve rewriting the whole core of the driver.

@psarna
Copy link
Contributor

psarna commented Nov 29, 2021

It's still quite interesting that adding a spawn in latte already leads to such reduction of the problem - it would mean that the depth of futures in the driver itself is not problematic on its own, because otherwise a spawn wouldn't help (tribute for this observation goes to @piodul), but once it's used from within an application that adds more depth - like latte, the problem gets amplified. I wonder if the cassandra-cpp backend wouldn't eventually experience the same issue, just much later, because the bindings make the depth of the resulting state machine much shorter.

In any case, I did some quick experiments and it's evident that for large parallelism values, simplifying the state machine causes vast improvements - e.g. I just prepared a branch which ignores retry policy and speculative retries, and it immediately increases throughput by around 20% for parallelism ~2048 (I picked 2048, because my local machine already experiences a huge drop in throughput compared to lower values).

@pkolaczk
Copy link
Author

Obviously I can't for sure rule out the O(n) complexity happens on latte side. However, I double checked the only place where multiple queries are processed is the main loop, which is pretty simple and standard - it uses buffer_unordered from crate futures to run multiple futures, and I don't do any manual future chaining by myself. The documentation of buffer_unordered says that they optimized that in such a way that it should poll only one future per wakeup, so if they are right, this should not cause the problem:

https://docs.rs/futures/0.3.18/futures/stream/futures_unordered/struct.FuturesUnordered.html

And the main loop is here:
https://github.com/pkolaczk/latte/blob/8680cfe90d982bc315c11d796b8d1c1c88f4c288/src/main.rs#L90-L125

@psarna
Copy link
Contributor

psarna commented Nov 29, 2021

Right - I find it confusing as well, especially having read latte's source code. The only candidate for a culprit is indeed buffer_unordered, I also tried twiddling with channel sizes, but that would be too simple to work. It's probably worthwhile to try and get rid of buffer_unordered in favor of an open-coded implementation and see if it makes a difference. I'll continue the investigation on my end as well

@pkolaczk
Copy link
Author

simplifying the state machine causes vast improvements

The main question is though, are those machines fixed size, or do they somehow grow with the number of queries submitted to the driver? According to the Rust forums thread you may run into an issue if there are recursive .await calls or if there is and_then chaining of futures.

@psarna
Copy link
Contributor

psarna commented Nov 29, 2021

simplifying the state machine causes vast improvements

The main question is though, are those machines fixed size, or do they somehow grow with the number of queries submitted to the driver? According to the Rust forums thread you may run into an issue if there are recursive .await calls or if there is and_then chaining of futures.

I tried to inspect the code rather thoroughly and didn't find any obvious recursive calls - retry policies are candidates, but dropping this path did not mitigate the issue. Also, if recursive awaits in the driver would be the root cause, I would assume that with sufficiently high parallelism you'd still hit the issue even after spawning a new task for each execute - since the driver side machines would still eventually grow big enough to cause noticeable overhead.

@pkolaczk
Copy link
Author

pkolaczk commented Nov 29, 2021

Switched from buffer_unordered to just buffered which is simpler.

The actual parallelism level went down, because that introduced a head-of-line blocking problem. For -p 8152 I'm getting:

     Concurrency     [req]     3534.5                                                                 
              └─       [%]       43.4  
      Throughput    [op/s]      30584 ± 1284                                                          
              ├─   [req/s]      30584 ± 1284 

Which is obviously far below expectations.
If I run it with buffer_unordered and -p 3500 I'm getting roughly the same throughput, so I conclude the same problem exists for buffered and buffer_unordered. Also that tells us that what really matters here is the number of unfinished futures in the buffer, not the total number of futures in the buffer.

Later I'll try to write a test with artificially large state machine and check if buffer_unordered shows the same weird O(N) behavior.

@psarna
Copy link
Contributor

psarna commented Nov 29, 2021

I haven't verified if my hastily crafted patch is correct, but I'll present it below anyway.

I dropped the idea of using buffer_unordered entirely (and as a result also buffered, since it's basically a worse flavor of buffer_unordered due to head-of-line blocking), and instead used another approach:

  1. Run in a loop until the shared count of all operations is decremented to 0
  2. Inside a loop, take a fixed amount of tasks to be queued (== parallelism), and await them by collecting them to FuturesUnordered

Unless I made some grievous design mistake, this approach works quite similarly to yours original code, with an exception that it makes the parallelism come in waves - a first batch must finish before the next one is scheduled.

Still, this patch vastly speeds up latte - it easily reaches >150kops on my machine for parallelism of 2048. One can still observe regression when the number goes up (4096 is still ok, but 8192 starts to get slower), but I guess that after some point Tokio is not going to behave well anyway with so many tasks being in the loop.

I don't have any hard proof except for the observation above, but it kind of looks like buffer_unordered causes the state machine size to explode.

diff --git a/src/count_down.rs b/src/count_down.rs
index 392325d..0ae836c 100644
--- a/src/count_down.rs
+++ b/src/count_down.rs
@@ -41,6 +41,10 @@ impl CountDown {
             }
         }
     }
+
+    pub fn load(&self) -> u64 {
+        self.value.load(Ordering::Relaxed)
+    }
 }
 
 pub struct BatchedCountDown {
@@ -72,4 +76,8 @@ impl BatchedCountDown {
             true
         }
     }
+
+    pub fn shared_count(&self) -> u64 {
+        self.shared.load()
+    }
 }
diff --git a/src/main.rs b/src/main.rs
index 5b6f9bb..26aefcd 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -7,6 +7,7 @@ use std::time::{Duration, Instant};
 
 use clap::Clap;
 use futures::future::ready;
+use futures::stream::futures_unordered::FuturesUnordered;
 use futures::{Future, SinkExt, Stream, StreamExt};
 use scylla::Session;
 use tokio::runtime::Builder;
@@ -92,34 +93,40 @@ where
         let action = &action;
         let mut remaining_count = BatchedCountDown::new(count, 64);
         let pending_count = AtomicUsize::new(0);
-        let mut req_stats = IntervalStream::new(interval(rate))
-            .take_while(|_| ready(remaining_count.dec()))
-            .enumerate()
-            .map(|(i, _)| {
-                let context = context.clone();
-                let pending_count = &pending_count;
-                async move {
-                    let queue_len = pending_count.fetch_add(1, Ordering::Relaxed);
-                    let start = Instant::now();
-                    let result = action(context, i as u64).await;
-                    let end = Instant::now();
-                    pending_count.fetch_sub(1, Ordering::Relaxed);
-                    RequestStats::from_result(result, end - start, queue_len)
-                }
-            })
-            .buffer_unordered(parallelism);
 
         let mut sample = Sample::new(Instant::now());
-        while let Some(req) = req_stats.next().await {
-            sample.record(req);
-            let now = Instant::now();
-            if now - sample.start_time > sampling_period {
-                let start_time = sample.start_time;
-                let elapsed_rounded = round(now - start_time, sampling_period);
-                let end_time = start_time + elapsed_rounded;
-                sample.finish(end_time);
-                tx.send(sample).await.unwrap();
-                sample = Sample::new(end_time);
+
+        while remaining_count.shared_count() > 0 {
+            let mut req_stats = IntervalStream::new(interval(rate))
+                .take_while(|_| ready(remaining_count.dec()))
+                .enumerate()
+                .map(|(i, _)| {
+                    let context = context.clone();
+                    let pending_count = &pending_count;
+                    async move {
+                        let queue_len = pending_count.fetch_add(1, Ordering::Relaxed);
+                        let start = Instant::now();
+                        let result = action(context, i as u64).await;
+                        let end = Instant::now();
+                        pending_count.fetch_sub(1, Ordering::Relaxed);
+                        RequestStats::from_result(result, end - start, queue_len)
+                    }
+                })
+                .take(parallelism)
+                .collect::<FuturesUnordered<_>>()
+                .await;
+
+            while let Some(req) = req_stats.next().await {
+                sample.record(req);
+                let now = Instant::now();
+                if now - sample.start_time > sampling_period {
+                    let start_time = sample.start_time;
+                    let elapsed_rounded = round(now - start_time, sampling_period);
+                    let end_time = start_time + elapsed_rounded;
+                    sample.finish(end_time);
+                    tx.send(sample).await.unwrap();
+                    sample = Sample::new(end_time);
+                }
             }
         }
         if !sample.is_empty() {

@pkolaczk
Copy link
Author

pkolaczk commented Nov 29, 2021

I'll test it tomorrow, but beware that your patch lowers the average concurrency level, because the number of pending futures will decrease down to zero in each wave. So obviously it would improve throughput even if the original problem remains.

However, if you're seeing a huge improvement from that, it is even more weird, because buffer_unordered itself is very simple and it mostly delegates all the real work to FuturesUnordered which you've also used here. The only difference is that buffer_unordered keeps adding new futures as soon as possible so it keeps the number of pending futures mostly constant. In your solution that adding is deferred.

@psarna
Copy link
Contributor

psarna commented Nov 30, 2021

I'll test it tomorrow, but beware that your patch lowers the average concurrency level, because the number of pending futures will decrease down to zero in each wave. So obviously it would improve throughput even if the original problem remains.

It does lower average concurrency, but it comes in bursts, so if the problem is that high concurrency doesn't mix well with large state machines produced by compiling the driver, I think it would get amplified after the patch. The reasoning is that each time a burst of concurrency comes, it would suffer from reduced throughput, and the breaks between bursts only lower the throughput further, because nothing gets executed. But that's still just a guess. In any case, I keep pushing this theory because it's so confusing that adding a tokio::spawn in latte fixes the issue... which would suggest that even though the complexity of futures in the driver might be largish, it's ultimately something in latte that induces the regression, and buffer_unordered sounds like the only sensible culprit. And by the way - the main loop in scylla-rust-driver doesn't really contain any recursive calls, it pretty much uses a pair of channels to queue requests, and a pair of regular loops to send and receive responses.

@cvybhu
Copy link
Contributor

cvybhu commented Nov 30, 2021

The documentation of buffer_unordered says that they optimized that in such a way that it should poll only one future per wakeup, so if they are right, this should not cause the problem:

I did a test and it looks like FuturesUnordered doesn't scale well as number of futures increases. It looks as if it has to go through all of the futures on each poll().
The test puts n futures in a single FuturesUnordered and then waits for it to finish. All futures finish one by one by locking a mutex.

n increases 2x, but time increases 4x, looks quadratic:

n: 10000, time: 23ms
n: 20000, time: 87ms
n: 40000, time: 331ms
n: 80000, time: 1230ms
n: 160000, time: 5100ms

Here's the code
main.rs:

use futures::stream::futures_unordered::FuturesUnordered;
use futures::stream::StreamExt;
use tokio::sync::Mutex;
use std::sync::Arc;

async fn work(mutex: Arc<Mutex<()>>) {
    mutex.lock().await;
}

async fn do_test(n: usize) {
    let start: std::time::Instant = std::time::Instant::now();    

    let mutex: Arc<Mutex<()>> = Arc::new(Mutex::new(()));
    let mutex_guard = mutex.lock().await;

    let mut futs = Vec::new();
    for _ in 0..n {
        futs.push(work(mutex.clone()));
    }
    let mut unorder: FuturesUnordered<_> = futs.into_iter().collect();

    std::mem::drop(mutex_guard);

    for _ in 0..n {
        unorder.select_next_some().await;
    }

    let end: std::time::Instant = std::time::Instant::now();
    println!("n: {}, time: {}ms", n, end.duration_since(start).as_millis());
}

#[tokio::main]
async fn main() {
    for n in [10_000, 20_000, 40_000, 80_000, 160_000] {
        do_test(n).await;
    }
}

Cargo.toml:

[dependencies]
tokio = { version = "1.12", features = ["full"] }
futures = "0.3.6"

@cvybhu
Copy link
Contributor

cvybhu commented Nov 30, 2021

Doing it using tokio::spawn gives linear time:

n: 10000, time: 5ms
n: 20000, time: 11ms
n: 40000, time: 23ms
n: 80000, time: 47ms
n: 160000, time: 86ms
use tokio::sync::Mutex;
use std::sync::Arc;

async fn work(mutex: Arc<Mutex<()>>) {
    mutex.lock().await;
}

async fn do_test(n: usize) {
    let start: std::time::Instant = std::time::Instant::now();    

    let mutex: Arc<Mutex<()>> = Arc::new(Mutex::new(()));
    let mutex_guard = mutex.lock().await;

    let mut futs = Vec::new();
    for _ in 0..n {
        futs.push(tokio::spawn(work(mutex.clone())));
    }

    std::mem::drop(mutex_guard);

    for f in futs {
        f.await.unwrap();
    }

    let end: std::time::Instant = std::time::Instant::now();
    println!("n: {}, time: {}ms", n, end.duration_since(start).as_millis());
}

#[tokio::main]
async fn main() {
    for n in [10_000, 20_000, 40_000, 80_000, 160_000] {
        do_test(n).await;
    }
}

@psarna
Copy link
Contributor

psarna commented Nov 30, 2021

Sad. We need to double-check our internal uses of FuturesUnordered, but it seems that we only use it in the connection pool and speculative execution. For speculative execution the number will probably be single digit, so quadratic behavior doesn't matter, and for the connection pool there already exists a fixme that the usage of FuturesUnordered should probably be replaced with a channel. I still don't expect it to be a problem, because we won't likely be establishing tens of thousands of connections, but it's better to be safe than sorry.

@cvybhu
Copy link
Contributor

cvybhu commented Nov 30, 2021

I wonder what's going on here, the implementation of FuturesUnordered is smart and tries hard to be linear. This could be a bug, I might open an issue.

@psarna
Copy link
Contributor

psarna commented Nov 30, 2021

I wonder what's going on here, the implementation of FuturesUnordered is smart and tries hard to be linear. This could be a bug, I might open an issue.

Does it though? It has mechanisms for not polling its futures more than once, but I don't see any mechanisms that prevent polling O(n) futures when FuturesOrdered are awakened, until it finds the one that's actually ready

@cvybhu
Copy link
Contributor

cvybhu commented Nov 30, 2021

It looks like it has two queues:

  • for futures that need to be polled
  • fur futures that don't need to be polled

When polling a managed future it gives it a custom waker, which moves the future to the first queue and then calls the waker of FuturesUnordered

This way it calls poll only on futures that actuall called their waker, all other pending futures are left alone on the other queue.

That should ensure that performance is fine o_0

@psarna
Copy link
Contributor

psarna commented Nov 30, 2021

It looks like it has two queues:

  • for futures that need to be polled
  • fur futures that don't need to be polled

When polling a managed future it gives it a custom waker, which moves the future to the first queue and then calls the waker of FuturesUnordered

This way it calls poll only on futures that actuall called their waker, all other pending futures are left alone on the other queue.

That should ensure that performance is fine o_0

Alright, I was looking at the source code for some ancient 0.1.x version... I keep doing that when browsing Rust docs.

@psarna
Copy link
Contributor

psarna commented Nov 30, 2021

@cvybhu in any case, please post your reproducer and results on future's bug tracker: https://github.com/rust-lang/futures-rs/issues and link to this issue, perhaps it's a known thing

@psarna
Copy link
Contributor

psarna commented Nov 30, 2021

Oh, we had a race here. Never mind :)

@cvybhu
Copy link
Contributor

cvybhu commented Nov 30, 2021

Opened rust-lang/futures-rs#2526

@psarna
Copy link
Contributor

psarna commented Nov 30, 2021

@pkolaczk @cvybhu following the advice from rust-lang/futures-rs#2526 (comment), the patch below indeed not only makes the problem disappear completely, but also significantly speeds up the execution even for lower concurrency values for me!

diff --git a/src/main.rs b/src/main.rs
index 5b6f9bb..cd80643 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -101,7 +101,7 @@ where
                 async move {
                     let queue_len = pending_count.fetch_add(1, Ordering::Relaxed);
                     let start = Instant::now();
-                    let result = action(context, i as u64).await;
+                    let result = tokio::task::unconstrained(action(context, i as u64)).await;
                     let end = Instant::now();
                     pending_count.fetch_sub(1, Ordering::Relaxed);
                     RequestStats::from_result(result, end - start, queue_len)

@pkolaczk
Copy link
Author

pkolaczk commented Nov 30, 2021

Thanks for investigation. Wow, I'd never think about a trap like that!
@psarna I think you're looking at an older latte version - I'll port the patch to the new one and let you know how fast it is now.

@cvybhu
Copy link
Contributor

cvybhu commented Nov 30, 2021

Wait, adding unconstrained will disable blocking protection and we will block the runtime, no?
We could instead add tokio::task::yield_now().await after reading next result from FuturesUnordered.

@psarna
Copy link
Contributor

psarna commented Nov 30, 2021

It will only block if the driver never yields, i.e. sockets are always ready and so on. For a benchmarking tool, turning off yielding might make sense, depending how it influences latency

@pkolaczk
Copy link
Author

pkolaczk commented Nov 30, 2021

@cvybhu I'm reading that differently - it doesn't yield only if a future is ready, but it still yields if future is not ready.
https://docs.rs/tokio/1.5.0/tokio/task/index.html#unconstrained

However, I fail to see how it is connected to O(n^2) complexity for buffer_unordered.

@cvybhu
Copy link
Contributor

cvybhu commented Nov 30, 2021

Here's my understanding of what's going on:
rust-lang/futures-rs#2526 (comment)

@pkolaczk
Copy link
Author

pkolaczk commented Dec 1, 2021

I added unconstrained and gave it a test. The results are different but still unsatisfactory.

The quadratic complexity issue is indeed gone. Upping the parallelism level does not lower the throughput.
The max throughput went up by about 10-15%.
However, my max throughput is still lower than the one I can get from cassandra_cpp (another 15% needed).

What I noticed is that now regardless of the -p setting, the actual average number of pending futures is about 150-170, and hits some kind of a ceiling.

-p 256: actual concurrency 116.3, throughput 114k
-p 512: actual concurrency 131.9, throughput 114k
-p 1000: actual concurrency 166.3, throughput 112k

cassandra_cpp is doing about 125-130k on the same server, with 1 thread, and the actual parallelism level is much higher - easily goes above 350. Note that these are extremely tiny statements on a link with extremely low latency, so parallelism must be high in order to fully load C* on the other side.

If I increase the number of threads to 2, then I can almost match cassandra_cpp on 1 thread.

I've got 2 hypotheses what's causing this:

  1. now that I've added unconstrained, the producer gets starved and simply can't deliver enough concurrent requests for execution; all power goes into the driver to get them processed as fast as possible, and because the number of requests is low, it doesn't take much advantage of buffering.

  2. because cassandra_cpp has a separate libuv executor, it is not truly single threaded; so it is not a true apple-to-apple comparison, as the producer runs on a different thread than the I/O thread sending the requests. So it might be simply the it is too much work to do for a single thread, and scylla is using most time processing the request and not much of the CPU core is left for producing the requests. However, CPU time seems to disprove that - CPU time is not higher for cassandra_cpp at 130k req/s.

BTW: if I remove the actual session.execute call and leave all the other things in place (running the script, including parameter conversion), it reaches over 610k req/s. So the producer is definitely fast enough for this test. Obviously I can work on that part more but first I'd like to rule out problem 1.

@dorlaor
Copy link

dorlaor commented Dec 1, 2021 via email

@pkolaczk
Copy link
Author

pkolaczk commented Dec 1, 2021

That's a great idea!

taskset -c 0 ./target/release/latte run workloads/basic/read.rn -d 1000000 -p 128

cassandra_cpp:

    Elapsed time       [s]      9.886                                                                 
        CPU time       [s]      9.592                                                                 
 CPU utilisation       [%]       97.0       // looks correct, got 1 core only
     Concurrency     [req]       95.2                                                                 
              └─       [%]       74.4                                                                 
      Throughput    [op/s]     101158 ± 1350                                                          
              ├─   [req/s]     101158 ± 1350                                                          
              └─   [row/s]     101158 ± 1350                                                          
  Mean call time      [ms]      1.075 ± 0.018                                                         
 Mean resp. time      [ms]      1.071 ± 0.018 

scylla:

    Elapsed time       [s]     19.756                                                                 
        CPU time       [s]     17.771                                                                 
 CPU utilisation       [%]       90.0 

  Concurrency     [req]       79.2                                                                 
              └─       [%]       61.9                                                                 
      Throughput    [op/s]      50620 ± 414                                                           
              ├─   [req/s]      50620 ± 414                                                           
              └─   [row/s]      50620 ± 414                                                           
  Mean call time      [ms]      1.520 ± 0.022                                                         
 Mean resp. time      [ms]      1.516 ± 0.022  

scylla 2x slower. Why? Something is wrong.

I tried to swap the futures crate to that new branch with a fix instead of using unconstrained but apparently my code doesn't compile with that futures release (incompatible with 3.0? incompatible with tokio?). I already asked there.

@pkolaczk
Copy link
Author

pkolaczk commented Dec 1, 2021

And I verified CPU frequency was properly locked at 3 GHz during both runs.

@pkolaczk
Copy link
Author

pkolaczk commented Dec 1, 2021

It looks like it sends one request at a time:

Starting iteration 0
Sending 1 requests; 46 bytes                                                     
Starting iteration 1
Sending 1 requests; 46 bytes
Starting iteration 2
Sending 1 requests; 46 bytes
Starting iteration 3
Sending 1 requests; 46 bytes
Starting iteration 4
Sending 1 requests; 46 bytes
Starting iteration 5
Sending 1 requests; 46 bytes
Starting iteration 6
Sending 1 requests; 46 bytes
Starting iteration 7
Sending 1 requests; 46 bytes
Starting iteration 8
Sending 1 requests; 46 bytes
Starting iteration 9
Sending 1 requests; 46 bytes
Starting iteration 10
Sending 1 requests; 46 bytes
Starting iteration 11
Sending 1 requests; 46 bytes
Starting iteration 12
Sending 1 requests; 46 bytes
Starting iteration 13
Sending 1 requests; 46 bytes
Starting iteration 14
Sending 1 requests; 46 bytes
Starting iteration 15
Sending 1 requests; 46 bytes
Starting iteration 16
Sending 1 requests; 46 bytes
Starting iteration 17
Sending 1 requests; 46 bytes
Starting iteration 18
Sending 1 requests; 46 bytes
Starting iteration 19
Sending 1 requests; 46 bytes
Starting iteration 20

I added yield_now() before the task_receiver.try_recv() but it didn't help. :(
Even if I call it multiple times in a loop, doesn't seem to change anything.

@pkolaczk
Copy link
Author

pkolaczk commented Dec 1, 2021

Oh, man, another trap here:

This function may not yield all the way up to the executor if there are any special combinators above it in the call stack. For example, if a tokio::select! has another branch complete during the same poll as the yield_now(), then the yield is not propagated all the way up to the runtime.
It is generally not guaranteed that the runtime behaves like you expect it to when deciding which task to schedule next after a call to yield_now()

Sp it looks like indeed the producer is starved - it can add only one new request per each write to the channel and write buffering doesn't work properly (and we already know write buffering is essential to get good perf here).

And yield_now() somehow doesn't work as I expected - I was hoping yielding would give the main loop another spin.

So I guess the way to go is to wait for futures_rs to backport their fix to 0.3 and remove unconstrained.
Unless you have an idea where I should put yield_now() so it actually un-starves the producer.

@pkolaczk
Copy link
Author

pkolaczk commented Dec 1, 2021

So I guess the way to go is to wait for futures_rs to backport their fix to 0.3 and remove unconstrained.

I managed to test that by temporarily dropping IntervalStream, but that did not help - still sending mostly 1 request at a time.

@pkolaczk
Copy link
Author

pkolaczk commented Dec 1, 2021

Well, one more thing I tried - wrapping the workload.run in its own task, spawned by tokio::spawn_local (I have to spawn local, because it is !Send). Surprisingly also didn't help, although it changed the concurrency level reported by latte - now it runs the number of futures close to the limit, but the driver still sends out queries one per buffer. And the overall effect was that it is even slower (~32k req/s) when the process is pinned to one core.

@pkolaczk
Copy link
Author

pkolaczk commented Dec 2, 2021

I tried to debug how buffer_unordered actually polls scylla futures and got a weird observation:

  • when I just launch a simple buffer_unordered in main, it all works as desired and buffering works (polls multiple futures, they return PENDING and afterwards they send all requests in one go).
  • when I added the same debugging to the futures inside latte main loop, it sends one request at a time....

Something fishy must be going on on the latte side then, stay tuned.

@pkolaczk
Copy link
Author

pkolaczk commented Dec 2, 2021

Looks like there is a huge difference just stemming from launching on another thread:

    let rt = Builder::new_current_thread()
        .enable_all()
        .build()
        .unwrap();
    std::thread::spawn(move || {
        rt.block_on(async move {
            println!("Stream started");
            let mut s = futures::stream::iter(0..10)
                .map(|i| DebugFuture::new(format!("q {}", i), loader.run(i)) )
                .buffer_unordered(5);

            while let Some(result) = s.next().await {
                println!("Got query result: {:?}", result);
            }
            println!("Stream ended");
        });
    });
    tokio::time::sleep(Duration::from_secs(100)).await;

Output:

Stream started
Polling future q 0
Future q 0 returned PENDING
Polling future q 1
Sending 1 requests; 46 bytes     /// <<<----- oh, why are you already sending?
Future q 1 returned PENDING
Polling future q 2
Sending 1 requests; 46 bytes
Future q 2 returned PENDING
Polling future q 3
Sending 1 requests; 46 bytes
Future q 3 returned PENDING
Polling future q 4
Sending 1 requests; 46 bytes
Future q 4 returned PENDING
Sending 1 requests; 46 bytes
Polling future q 1
Future q 1 returned READY
Got query result: Ok(())
Polling future q 0
Future q 0 returned READY
Got query result: Ok(())
Polling future q 2
Future q 2 returned READY
Got query result: Ok(())
Polling future q 5
Future q 5 returned PENDING
Polling future q 6
Sending 1 requests; 46 bytes           
Future q 6 returned PENDING
Polling future q 3
Sending 1 requests; 46 bytes
Future q 3 returned READY
Got query result: Ok(())
Polling future q 4
Future q 4 returned READY
Got query result: Ok(())
Polling future q 7
Future q 7 returned PENDING
Polling future q 8
Sending 1 requests; 46 bytes
Future q 8 returned PENDING
Polling future q 9
Sending 1 requests; 46 bytes
Future q 9 returned PENDING
Polling future q 5
Sending 1 requests; 46 bytes
Future q 5 returned READY
Got query result: Ok(())
Polling future q 6
Future q 6 returned READY
Got query result: Ok(())
Polling future q 7
Future q 7 returned READY
Got query result: Ok(())
Polling future q 8
Future q 8 returned READY
Got query result: Ok(())
Polling future q 9
Future q 9 returned READY
Got query result: Ok(())
Stream ended

But the same code executed on the main (also singlethreaded) runtime, without spawining additional thread:

 println!("Stream started");
let mut s = futures::stream::iter(0..10)
    .map(|i| DebugFuture::new(format!("q {}", i), loader.run(i)) )
    .buffer_unordered(5);

while let Some(result) = s.next().await {
    println!("Got query result: {:?}", result);
}
println!("Stream ended");
Stream started
Polling future q 0
Future q 0 returned PENDING
Polling future q 1
Future q 1 returned PENDING
Polling future q 2
Future q 2 returned PENDING
Polling future q 3
Future q 3 returned PENDING
Polling future q 4
Future q 4 returned PENDING
Sending 5 requests; 230 bytes    // <<<<<<<<<<<<<<<<<<< SEE THIS
Polling future q 4
Future q 4 returned READY
Got query result: Ok(())
Polling future q 3
Future q 3 returned READY
Got query result: Ok(())
Polling future q 0
Future q 0 returned READY
Got query result: Ok(())
Polling future q 2
Future q 2 returned READY
Got query result: Ok(())
Polling future q 1
Future q 1 returned READY
Got query result: Ok(())
Polling future q 5
Future q 5 returned PENDING
Polling future q 6
Future q 6 returned PENDING
Polling future q 7
Future q 7 returned PENDING
Polling future q 8
Future q 8 returned PENDING
Polling future q 9
Future q 9 returned PENDING
Sending 5 requests; 230 bytes   // <<<<<<<<<<<<<<<<<<< SEE THIS
Polling future q 7
Future q 7 returned READY
Got query result: Ok(())
Polling future q 5
Future q 5 returned READY
Got query result: Ok(())
Polling future q 6
Future q 6 returned READY
Got query result: Ok(())
Polling future q 8
Future q 8 returned READY
Got query result: Ok(())
Polling future q 9
Future q 9 returned READY
Got query result: Ok(())
Stream ended

Any ideas why could this be?

I believe I should figure out how to avoid running my own threads and just use a single multithreaded RT instead.
(for this to work I need to figure out how to solve some problems with !Send futures, unrelated to Scylla)

@Jasperav
Copy link
Contributor

Jasperav commented Dec 3, 2021

Note sure if this is the right issue to comment on, but I was quite surprised to see that cdrs_tokio is quicker than this crate in most scenario's according to the benchmarks provided in the readme: https://github.com/krojew/cdrs-tokio. Maybe interesting to see how the maintainer managed to make it perform so quick.

@psarna
Copy link
Contributor

psarna commented Dec 3, 2021

@Jasperav the benchmarks presented in the readme do not mention any of the following:

  • cluster setup: number of nodes, network, type of machines
  • was it run against Cassandra or Scylla
  • client setup: on what machine was the benchmark itself run - was it remote or local to the cluster?
  • drivers setup: which non-default configuration options were used for respective drivers, if any?
  • in general: the source code of the benchmarks, to make it reviewable and reproducible (or maybe it is somewhere, I just didn't find any mention of it)

so it's hard to deduce anything from the output. For any reproducible benchmarks we'd be happy to rerun and verify them, and do our best to match and/or outperform other drivers, if they prove to be faster.

@pkolaczk
Copy link
Author

pkolaczk commented Dec 3, 2021

After I switched to a single tokio runtime I'm getting way better results now and buffering on Scylla 0.3.1 works properly in that scenario, even with tokio::task::unconstrained.

Running on a single executor has also this nice property that now when I specify -t 1 (one thread), then it really doesn't use more than 1 CPU core. As I've said before, cassandra_cpp is running its own libuv executor, so it may achieve better results by simply using more CPU.

I have also found that adding lto = true; panic = "abort" to the release profile improves speed significantly.

To summarize:

  • futures::buffer_unordered has a scalability issue when running many futures with bigger state machines; there is an easy workaround by wrapping futures in unconstrained, and there is a fix ready in futures-rs project.
  • scylla driver seems to not like when the session is established on one executor but queries are spawned on a different executor - something weird happens and buffering doesn't work properly.

Feel free to close the ticket.
Or leave it open if you want to investigate why buffering doesn't work when queries spawned on a separate executor.
Thank you for all the help.

@Jasperav there are a lot of different things that can affect benchmark results.

@psarna
Copy link
Contributor

psarna commented Jan 14, 2022

FYI, FuturesUnordered is likely to be able to detect if a future wants to cooperatively yield soon: rust-lang/futures-rs#2551 . The nice trick is checking if it's the task that wants to wake itself, vs when something else wants to wake it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants