Skip to content

Commit

Permalink
Naming, review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Benjamin Leggett <benjamin.leggett@solo.io>
  • Loading branch information
bleggett committed Apr 17, 2024
1 parent ee3d871 commit 440f533
Showing 1 changed file with 17 additions and 18 deletions.
35 changes: 17 additions & 18 deletions src/proxy/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,11 @@ impl WorkloadHBONEPool {
//
// If many `connects` request a connection to the same dest at once, all will wait until exactly
// one connection is created, before deciding if they should create more or just use that one.
pub async fn connect(&mut self, key: WorkloadKey) -> Result<ConnClient, Error> {
pub async fn connect(&mut self, workload_key: WorkloadKey) -> Result<ConnClient, Error> {
debug!("pool connect START");
// TODO BML this may not be collision resistant/slow. It should be resistant enough for workloads tho.
let mut s = DefaultHasher::new();
key.hash(&mut s);
workload_key.hash(&mut s);
let hash_key = s.finish();
let pool_key = pingora_pool::ConnectionMeta::new(
hash_key,
Expand All @@ -143,7 +143,7 @@ impl WorkloadHBONEPool {
// This is so we can backpressure correctly if 1000 tasks all demand a new connection
// to the same key at once, and not eagerly open 1000 tunnel connections.
let existing_conn = self
.first_checkout_conn_from_pool(&key, hash_key, &pool_key)
.first_checkout_conn_from_pool(&workload_key, &pool_key)
.await;

debug!("pool connect GOT EXISTING");
Expand Down Expand Up @@ -219,17 +219,17 @@ impl WorkloadHBONEPool {
// So, carry on doing that.
debug!("appears we need a new conn, retaining connlock");
debug!("nothing else is creating a conn, make one");
let pool_conn = self.spawn_new_pool_conn(key.clone()).await;
let pool_conn = self.spawn_new_pool_conn(workload_key.clone()).await;
let client = ConnClient{
sender: pool_conn?,
stream_count: Arc::new(AtomicU16::new(0)),
stream_count_max: self.max_streamcount,
wl_key: key.clone(),
wl_key: workload_key.clone(),
};

debug!(
"starting new conn for key {:#?} with pk {:#?}",
key, pool_key
workload_key, pool_key
);
debug!("dropping lock");
Some(client)
Expand Down Expand Up @@ -265,7 +265,7 @@ impl WorkloadHBONEPool {
// We found a conn, but it's already maxed out.
// Return None and create another.
if e_conn.at_max_streamcount() {
debug!("found existing conn for key {:#?}, but streamcount is maxed", key);
debug!("found existing conn for key {:#?}, but streamcount is maxed", workload_key);
break None;
}
debug!("pool connect LOOP END");
Expand Down Expand Up @@ -310,13 +310,13 @@ impl WorkloadHBONEPool {
// (streamcount maxed, etc)
// Start a new one, clone the underlying client, return a copy, and put the other back in the pool.
None => {
debug!("spawning new conn for key {:#?} to replace", key);
let pool_conn = self.spawn_new_pool_conn(key.clone()).await;
debug!("spawning new conn for key {:#?} to replace", workload_key);
let pool_conn = self.spawn_new_pool_conn(workload_key.clone()).await;
let r_conn = ConnClient{
sender: pool_conn?,
stream_count: Arc::new(AtomicU16::new(0)),
stream_count_max: self.max_streamcount,
wl_key: key.clone(),
wl_key: workload_key.clone(),
};
self.checkin_conn(r_conn.clone(), pool_key.clone());
Ok(r_conn)
Expand All @@ -330,7 +330,7 @@ impl WorkloadHBONEPool {
// not equal the provided key
//
// this is a final safety check for collisions, we will throw up our hands and refuse to return the conn
match conn.is_for_workload(key) {
match conn.is_for_workload(workload_key) {
Ok(()) => {
return Ok(conn)
}
Expand All @@ -343,22 +343,21 @@ impl WorkloadHBONEPool {
}
async fn first_checkout_conn_from_pool(
&self,
key: &WorkloadKey,
hash_key: u64,
workload_key: &WorkloadKey,
pool_key: &pingora_pool::ConnectionMeta,
) -> Option<ConnClient> {
debug!("first checkout READLOCK");
let map_read_lock = self.established_conn_writelock.read().await;
match map_read_lock.get(&hash_key) {
match map_read_lock.get(&pool_key.key) {
Some(exist_conn_lock) => {
debug!("first checkout INNER WRITELOCK");
let _conn_lock = exist_conn_lock.as_ref().unwrap().lock().await;

debug!("getting conn for key {:#?} and hash {:#?}", key, hash_key);
self.connected_pool.get(&hash_key).and_then(|e_conn| {
debug!("got existing conn for key {:#?}", key);
debug!("getting conn for key {:#?} and hash {:#?}", workload_key, pool_key.key);
self.connected_pool.get(&pool_key.key).and_then(|e_conn| {
debug!("got existing conn for key {:#?}", workload_key);
if e_conn.at_max_streamcount() {
debug!("got conn for key {:#?}, but streamcount is maxed", key);
debug!("got conn for key {:#?}, but streamcount is maxed", workload_key);
None
} else {
self.checkin_conn(e_conn.clone(), pool_key.clone());
Expand Down

0 comments on commit 440f533

Please sign in to comment.