Skip to content

Commit

Permalink
Drop futures-util dep from benchmarking tools
Browse files Browse the repository at this point in the history
  • Loading branch information
Ralith committed Dec 29, 2021
1 parent ab93560 commit 96526a3
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 24 deletions.
3 changes: 1 addition & 2 deletions bench/Cargo.toml
Expand Up @@ -8,12 +8,11 @@ publish = false
[dependencies]
anyhow = "1.0.22"
bytes = "1"
futures-util = "0.3.11"
hdrhistogram = { version = "7.2", default-features = false }
quinn = { path = "../quinn" }
rcgen = "0.8"
rustls = { version = "0.20", default-features = false, features = ["quic"] }
structopt = "0.3"
tokio = { version = "1.0.1", features = ["rt"] }
tokio = { version = "1.0.1", features = ["rt", "sync"] }
tracing = "0.1.10"
tracing-subscriber = { version = "0.3.0", default-features = false, features = ["env-filter", "fmt", "ansi", "time", "local-time"] }
51 changes: 30 additions & 21 deletions bench/src/bin/bulk.rs
@@ -1,8 +1,12 @@
use std::{net::SocketAddr, sync::Arc, time::Instant};
use std::{
net::SocketAddr,
sync::{Arc, Mutex},
time::Instant,
};

use anyhow::{Context, Result};
use futures_util::StreamExt;
use structopt::StructOpt;
use tokio::sync::Semaphore;
use tracing::{info, trace};

use bench::{
Expand Down Expand Up @@ -55,13 +59,12 @@ fn main() {
server_thread.join().expect("server thread");
}

async fn server(incoming: quinn::Incoming, opt: Opt) -> Result<()> {
// Handle only the expected amount of clients
let mut incoming = incoming.take(opt.clients);

async fn server(mut incoming: quinn::Incoming, opt: Opt) -> Result<()> {
let mut server_tasks = Vec::new();

while let Some(handshake) = incoming.next().await {
// Handle only the expected amount of clients
for _ in 0..opt.clients {
let handshake = incoming.next().await.unwrap();
let quinn::NewConnection {
mut bi_streams,
connection,
Expand Down Expand Up @@ -116,23 +119,29 @@ async fn client(

let connection = Arc::new(connection);

let mut ops =
futures_util::stream::iter(
(0..opt.streams).map(|_| {
let connection = connection.clone();
async move {
handle_client_stream(connection, opt.upload_size, opt.read_unordered).await
}
}),
)
.buffer_unordered(opt.max_streams);

let mut stats = ClientStats::default();
let mut first_error = None;

while let Some(stream_result) = ops.next().await {
info!("stream finished: {:?}", stream_result);
match stream_result {
let sem = Arc::new(Semaphore::new(opt.max_streams));
let results = Arc::new(Mutex::new(Vec::new()));
for _ in 0..opt.streams {
let permit = sem.clone().acquire_owned().await.unwrap();
let results = results.clone();
let connection = connection.clone();
tokio::spawn(async move {
let result =
handle_client_stream(connection, opt.upload_size, opt.read_unordered).await;
info!("stream finished: {:?}", result);
results.lock().unwrap().push(result);
drop(permit);
});
}

// Wait for remaining streams to finish
let _ = sem.acquire_many(opt.max_streams as u32).await.unwrap();

for result in results.lock().unwrap().drain(..) {
match result {
Ok((upload_result, download_result)) => {
stats.upload_stats.stream_finished(upload_result);
stats.download_stats.stream_finished(download_result);
Expand Down
1 change: 0 additions & 1 deletion perf/Cargo.toml
Expand Up @@ -12,7 +12,6 @@ json-output = ["serde", "serde_json"]

[dependencies]
anyhow = "1.0.22"
futures-util = "0.3.11"
hdrhistogram = { version = "7.2", default-features = false }
quinn = { path = "../quinn" }
rcgen = "0.8"
Expand Down

0 comments on commit 96526a3

Please sign in to comment.