From 64c7beefb77ce6fdc185dad9964ccc9ec8223303 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Fri, 14 May 2021 18:31:48 -0700 Subject: [PATCH] perf(http2): slow adaptive window pings as the BDP stabilizes This introduces a delay to sending a ping to calculate the BDP that becomes shorter as the BDP is changing, to improve throughput quickly, but then also becomes longer as the BDP stabilizes, to reduce the amount of pings sent. This improved the performance of the adaptive window end_to_end benchmark. It should also reduce the amount of pings the remote has to deal with, hopefully preventing hyper from triggering ENHANCE_YOUR_CALM errors. --- src/proto/h2/ping.rs | 81 ++++++++++++++++++++++++++++++++------------ 1 file changed, 59 insertions(+), 22 deletions(-) diff --git a/src/proto/h2/ping.rs b/src/proto/h2/ping.rs index 105fc69a39..495f7dcde4 100644 --- a/src/proto/h2/ping.rs +++ b/src/proto/h2/ping.rs @@ -51,9 +51,15 @@ pub(super) fn channel(ping_pong: PingPong, config: Config) -> (Recorder, Ponger) bdp: wnd, max_bandwidth: 0.0, rtt: 0.0, + ping_delay: Duration::from_millis(100), + stable_count: 0, }); - let bytes = bdp.as_ref().map(|_| 0); + let (bytes, next_bdp_at) = if bdp.is_some() { + (Some(0), Some(Instant::now())) + } else { + (None, None) + }; #[cfg(feature = "runtime")] let keep_alive = config.keep_alive_interval.map(|interval| KeepAlive { @@ -75,6 +81,7 @@ pub(super) fn channel(ping_pong: PingPong, config: Config) -> (Recorder, Ponger) is_keep_alive_timed_out: false, ping_pong, ping_sent_at: None, + next_bdp_at, })); ( @@ -125,6 +132,9 @@ struct Shared { /// If `Some`, bdp is enabled, and this tracks how many bytes have been /// read during the current sample. bytes: Option, + /// We delay a variable amount of time between BDP pings. This allows us + /// to send less pings as the bandwidth stabilizes. + next_bdp_at: Option, // keep-alive /// If `Some`, keep-alive is enabled, and the Instant is how long ago @@ -143,6 +153,12 @@ struct Bdp { max_bandwidth: f64, /// Round trip time in seconds rtt: f64, + /// Delay the next ping by this amount. + /// + /// This will change depending on how stable the current bandwidth is. + ping_delay: Duration, + /// The count of ping round trips where BDP has stayed the same. + stable_count: u32, } #[cfg(feature = "runtime")] @@ -207,6 +223,17 @@ impl Recorder { #[cfg(feature = "runtime")] locked.update_last_read_at(); + // are we ready to send another bdp ping? + // if not, we don't need to record bytes either + + if let Some(ref next_bdp_at) = locked.next_bdp_at { + if Instant::now() < *next_bdp_at { + return; + } else { + locked.next_bdp_at = None; + } + } + if let Some(ref mut bytes) = locked.bytes { *bytes += len; } else { @@ -265,6 +292,7 @@ impl Recorder { impl Ponger { pub(super) fn poll(&mut self, cx: &mut task::Context<'_>) -> Poll { + let now = Instant::now(); let mut locked = self.shared.lock().unwrap(); #[cfg(feature = "runtime")] let is_idle = self.is_idle(); @@ -282,12 +310,11 @@ impl Ponger { return Poll::Pending; } - let (bytes, rtt) = match locked.ping_pong.poll_pong(cx) { + match locked.ping_pong.poll_pong(cx) { Poll::Ready(Ok(_pong)) => { - let rtt = locked + let rtt = now - locked .ping_sent_at - .expect("pong received implies ping_sent_at") - .elapsed(); + .expect("pong received implies ping_sent_at"); locked.ping_sent_at = None; trace!("recv pong"); @@ -299,19 +326,20 @@ impl Ponger { } } - if self.bdp.is_some() { + if let Some(ref mut bdp) = self.bdp { let bytes = locked.bytes.expect("bdp enabled implies bytes"); locked.bytes = Some(0); // reset trace!("received BDP ack; bytes = {}, rtt = {:?}", bytes, rtt); - (bytes, rtt) - } else { - // no bdp, done! - return Poll::Pending; + + let update = bdp.calculate(bytes, rtt); + locked.next_bdp_at = Some(now + bdp.ping_delay); + if let Some(update) = update { + return Poll::Ready(Ponged::SizeUpdate(update)) + } } } Poll::Ready(Err(e)) => { debug!("pong error: {}", e); - return Poll::Pending; } Poll::Pending => { #[cfg(feature = "runtime")] @@ -324,19 +352,11 @@ impl Ponger { } } } - - return Poll::Pending; } - }; - - drop(locked); - - if let Some(bdp) = self.bdp.as_mut().and_then(|bdp| bdp.calculate(bytes, rtt)) { - Poll::Ready(Ponged::SizeUpdate(bdp)) - } else { - // XXX: this doesn't register a waker...? - Poll::Pending } + + // XXX: this doesn't register a waker...? + Poll::Pending } #[cfg(feature = "runtime")] @@ -386,6 +406,7 @@ impl Bdp { fn calculate(&mut self, bytes: usize, rtt: Duration) -> Option { // No need to do any math if we're at the limit. if self.bdp as usize == BDP_LIMIT { + self.stabilize_delay(); return None; } @@ -405,6 +426,7 @@ impl Bdp { if bw < self.max_bandwidth { // not a faster bandwidth, so don't update + self.stabilize_delay(); return None; } else { self.max_bandwidth = bw; @@ -415,11 +437,26 @@ impl Bdp { if bytes >= self.bdp as usize * 2 / 3 { self.bdp = (bytes * 2).min(BDP_LIMIT) as WindowSize; trace!("BDP increased to {}", self.bdp); + + self.stable_count = 0; + self.ping_delay /= 2; Some(self.bdp) } else { + self.stabilize_delay(); None } } + + fn stabilize_delay(&mut self) { + if self.ping_delay < Duration::from_secs(10) { + self.stable_count += 1; + + if self.stable_count >= 2 { + self.ping_delay *= 4; + self.stable_count = 0; + } + } + } } fn seconds(dur: Duration) -> f64 {