Skip to content

Commit

Permalink
lints
Browse files Browse the repository at this point in the history
Signed-off-by: Benjamin Leggett <benjamin.leggett@solo.io>
  • Loading branch information
bleggett committed Apr 26, 2024
1 parent 4495fc2 commit 7e794b6
Showing 1 changed file with 44 additions and 44 deletions.
88 changes: 44 additions & 44 deletions src/proxy/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,10 @@ impl PoolState {
};
match found_conn {
Some(exist_conn_lock) => {
debug!("checkout - found mutex for key, waiting for writelock");
debug!(
"checkout - found mutex for pool key {:#?}, waiting for writelock",
pool_key
);
let _conn_lock = exist_conn_lock.as_ref().lock().await;

trace!(
Expand Down Expand Up @@ -639,7 +642,6 @@ mod test {
use futures_util::StreamExt;
use hyper::body::Incoming;

use http_body_util::BodyExt;
use hyper::service::service_fn;
use hyper::{Request, Response};
use std::sync::atomic::AtomicU32;
Expand Down Expand Up @@ -1164,7 +1166,7 @@ mod test {
"actual before conncount was {before_conncount}"
);
assert!(
before_dropcount == 0,
before_dropcount != 3,
"actual before dropcount was {before_dropcount}"
);

Expand Down Expand Up @@ -1199,7 +1201,7 @@ mod test {
let cfg = crate::config::Config {
local_node: Some("local-node".to_string()),
pool_max_streams_per_conn: 50,
pool_unused_release_timeout: Duration::from_secs(2),
pool_unused_release_timeout: Duration::from_secs(1),
..crate::config::parse_config().unwrap()
};
let sock_fact = Arc::new(crate::proxy::DefaultSocketFactory);
Expand All @@ -1217,47 +1219,58 @@ mod test {
let mut tasks = futures::stream::FuturesUnordered::new();
loop {
count += 1;
tasks.push(spawn_client(pool.clone(), key1.clone(), server_addr, 25));
tasks.push(spawn_client(pool.clone(), key1.clone(), server_addr, 1));

if count == client_count {
break;
}
}

let (client_stop_signal, client_stop) = drain::channel();
let persist_res =
spawn_persistent_client(pool.clone(), key1.clone(), server_addr, client_stop);

// TODO we spawn clients too fast (and they have little to do) and they actually break the
// local "fake" test server, causing it to start returning "conn refused/peer refused the connection"
// when the pool tries to create new connections for that caller
//
// (the pool will just pass that conn refused back to the caller)
//
// In the real world this is fine, since we aren't hitting a local server,
// servers can refuse connections - in synthetic tests it leads to flakes.
//
// It is worth considering if the pool should throttle how frequently it allows itself to create
// connections to real upstreams (e.g. "I created a conn for this key 10ms ago and you've already burned through
// your streamcount, chill out, you're gonna overload the dest")
//
// For now, streamcount is an inexact flow control for this.
sleep(Duration::from_millis(500)).await;
//loop thru the nonpersistent clients and wait for them to finish
while let Some(Err(res)) = tasks.next().await {
assert!(!res.is_panic(), "CLIENT PANICKED!");
continue;
}

let (client_stop_signal, client_stop) = drain::channel();
let persist_res =
spawn_persistent_client(pool.clone(), key1.clone(), server_addr, client_stop);

//Attempt to wait a bit more, to ensure the connections NOT held open by our persistent client are dropped.
sleep(Duration::from_secs(1)).await;
let before_conncount = conn_counter.load(Ordering::Relaxed);
let before_dropcount = conn_drop_counter.load(Ordering::Relaxed);
assert!(
before_conncount == 3,
"actual before conncount was {before_conncount}"
);
// At this point, we should still have one conn that hasn't been dropped
// because we haven't ended the persistent client
assert!(
before_dropcount == 0,
before_dropcount == 2,
"actual before dropcount was {before_dropcount}"
);

// Attempt to wait long enough for pool conns to timeout+evict
sleep(Duration::from_secs(2)).await;

let real_conncount = conn_counter.load(Ordering::Relaxed);
assert!(real_conncount == 3, "actual conncount was {real_conncount}");
// At this point, we should still have one conn that hasn't been dropped
// because we haven't ended the persistent client
let real_dropcount = conn_drop_counter.load(Ordering::Relaxed);
assert!(real_dropcount == 2, "actual dropcount was {real_dropcount}");
client_stop_signal.drain().await;
assert!(persist_res.await.is_ok(), "PERSIST CLIENT ERROR");

sleep(Duration::from_secs(2)).await;
//Attempt to wait a bit more, to ensure the connections held open by our persistent client is dropped.
sleep(Duration::from_secs(1)).await;

let after_conncount = conn_counter.load(Ordering::Relaxed);
assert!(
Expand Down Expand Up @@ -1311,23 +1324,12 @@ mod test {

if res.is_err() {
panic!("SEND ERR: {:#?} sendcount {count}", res);
} else if !res.is_ok() {
panic!("CLIENT RETURNED ERROR")
}

let mut okres = res.unwrap();
const HBONE_MESSAGE: &[u8] = b"hbone\n";
while let Some(next) = okres.frame().await {
let frame = next.expect("better have a resp body");
if let Some(chunk) = frame.data_ref() {
assert_eq!(HBONE_MESSAGE, chunk);
}
}
if count >= req_count {
debug!("CLIENT DONE");
break;
}

}
})
}
Expand Down Expand Up @@ -1361,17 +1363,16 @@ mod test {
start.elapsed().as_millis()
);

let mut count = 0u32;
// send forever, once we get a conn, until someone signals us to stop
let send_loop = async move {
//send once, then hold the conn open until signaled
let res = c1.send_request(req()).await;
if res.is_err() {
panic!("SEND ERR: {:#?}", res);
}
loop {
count += 1;
let res = c1.send_request(req()).await;
if res.is_err() {
panic!("SEND ERR: {:#?} sendcount {count}", res);
} else if res.unwrap().status() != 200 {
panic!("CLIENT RETURNED ERROR")
}
debug!("persistent client yielding");
sleep(Duration::from_millis(1)).await; //yield may be enough
tokio::task::yield_now().await;
}
};

Expand All @@ -1397,10 +1398,9 @@ mod test {
tokio::task::spawn(async move {
match hyper::upgrade::on(req).await {
Ok(upgraded) => {
let (mut ri, mut wi) =
tokio::io::split(hyper_util::rt::TokioIo::new(upgraded));
wi.write_all(b"hbone\n").await.unwrap();
tcp::handle_stream(tcp::Mode::ReadWrite, &mut ri, &mut wi).await;
let mut io = hyper_util::rt::TokioIo::new(upgraded);
io.write_all(b"poolsrv\n").await.unwrap();
tcp::handle_stream(tcp::Mode::ReadWrite, &mut io).await;
}
Err(e) => panic!("No upgrade {e}"),
}
Expand Down

0 comments on commit 7e794b6

Please sign in to comment.