diff --git a/src/proto/h2/ping.rs b/src/proto/h2/ping.rs index 105fc69a39..3ff45cae1e 100644 --- a/src/proto/h2/ping.rs +++ b/src/proto/h2/ping.rs @@ -51,9 +51,15 @@ pub(super) fn channel(ping_pong: PingPong, config: Config) -> (Recorder, Ponger) bdp: wnd, max_bandwidth: 0.0, rtt: 0.0, + ping_delay: Duration::from_millis(100), + stable_count: 0, }); - let bytes = bdp.as_ref().map(|_| 0); + let (bytes, next_bdp_at) = if bdp.is_some() { + (Some(0), Some(Instant::now())) + } else { + (None, None) + }; #[cfg(feature = "runtime")] let keep_alive = config.keep_alive_interval.map(|interval| KeepAlive { @@ -75,6 +81,7 @@ pub(super) fn channel(ping_pong: PingPong, config: Config) -> (Recorder, Ponger) is_keep_alive_timed_out: false, ping_pong, ping_sent_at: None, + next_bdp_at, })); ( @@ -125,6 +132,9 @@ struct Shared { /// If `Some`, bdp is enabled, and this tracks how many bytes have been /// read during the current sample. bytes: Option, + /// We delay a variable amount of time between BDP pings. This allows us + /// to send less pings as the bandwidth stabilizes. + next_bdp_at: Option, // keep-alive /// If `Some`, keep-alive is enabled, and the Instant is how long ago @@ -143,6 +153,12 @@ struct Bdp { max_bandwidth: f64, /// Round trip time in seconds rtt: f64, + /// Delay the next ping by this amount. + /// + /// This will change depending on how stable the current bandwidth is. + ping_delay: Duration, + /// The count of ping round trips where BDP has stayed the same. + stable_count: u32, } #[cfg(feature = "runtime")] @@ -207,6 +223,17 @@ impl Recorder { #[cfg(feature = "runtime")] locked.update_last_read_at(); + // are we ready to send another bdp ping? + // if not, we don't need to record bytes either + + if let Some(ref next_bdp_at) = locked.next_bdp_at { + if Instant::now() < *next_bdp_at { + return; + } else { + locked.next_bdp_at = None; + } + } + if let Some(ref mut bytes) = locked.bytes { *bytes += len; } else { @@ -265,6 +292,7 @@ impl Recorder { impl Ponger { pub(super) fn poll(&mut self, cx: &mut task::Context<'_>) -> Poll { + let now = Instant::now(); let mut locked = self.shared.lock().unwrap(); #[cfg(feature = "runtime")] let is_idle = self.is_idle(); @@ -282,13 +310,13 @@ impl Ponger { return Poll::Pending; } - let (bytes, rtt) = match locked.ping_pong.poll_pong(cx) { + match locked.ping_pong.poll_pong(cx) { Poll::Ready(Ok(_pong)) => { - let rtt = locked + let start = locked .ping_sent_at - .expect("pong received implies ping_sent_at") - .elapsed(); + .expect("pong received implies ping_sent_at"); locked.ping_sent_at = None; + let rtt = now - start; trace!("recv pong"); #[cfg(feature = "runtime")] @@ -299,19 +327,20 @@ impl Ponger { } } - if self.bdp.is_some() { + if let Some(ref mut bdp) = self.bdp { let bytes = locked.bytes.expect("bdp enabled implies bytes"); locked.bytes = Some(0); // reset trace!("received BDP ack; bytes = {}, rtt = {:?}", bytes, rtt); - (bytes, rtt) - } else { - // no bdp, done! - return Poll::Pending; + + let update = bdp.calculate(bytes, rtt); + locked.next_bdp_at = Some(now + bdp.ping_delay); + if let Some(update) = update { + return Poll::Ready(Ponged::SizeUpdate(update)) + } } } Poll::Ready(Err(e)) => { debug!("pong error: {}", e); - return Poll::Pending; } Poll::Pending => { #[cfg(feature = "runtime")] @@ -324,19 +353,11 @@ impl Ponger { } } } - - return Poll::Pending; } - }; - - drop(locked); - - if let Some(bdp) = self.bdp.as_mut().and_then(|bdp| bdp.calculate(bytes, rtt)) { - Poll::Ready(Ponged::SizeUpdate(bdp)) - } else { - // XXX: this doesn't register a waker...? - Poll::Pending } + + // XXX: this doesn't register a waker...? + Poll::Pending } #[cfg(feature = "runtime")] @@ -386,6 +407,7 @@ impl Bdp { fn calculate(&mut self, bytes: usize, rtt: Duration) -> Option { // No need to do any math if we're at the limit. if self.bdp as usize == BDP_LIMIT { + self.stabilize_delay(); return None; } @@ -405,6 +427,7 @@ impl Bdp { if bw < self.max_bandwidth { // not a faster bandwidth, so don't update + self.stabilize_delay(); return None; } else { self.max_bandwidth = bw; @@ -415,11 +438,26 @@ impl Bdp { if bytes >= self.bdp as usize * 2 / 3 { self.bdp = (bytes * 2).min(BDP_LIMIT) as WindowSize; trace!("BDP increased to {}", self.bdp); + + self.stable_count = 0; + self.ping_delay /= 2; Some(self.bdp) } else { + self.stabilize_delay(); None } } + + fn stabilize_delay(&mut self) { + if self.ping_delay < Duration::from_secs(10) { + self.stable_count += 1; + + if self.stable_count >= 2 { + self.ping_delay *= 4; + self.stable_count = 0; + } + } + } } fn seconds(dur: Duration) -> f64 {