Skip to content

Commit

Permalink
perf(http2): slow adaptive window pings as the BDP stabilizes (hyperi…
Browse files Browse the repository at this point in the history
…um#2550)

This introduces a delay to sending a ping to calculate the BDP that
becomes shorter as the BDP is changing, to improve throughput quickly,
but then also becomes longer as the BDP stabilizes, to reduce the amount
of pings sent. This improved the performance of the adaptive window
end_to_end benchmark.

It should also reduce the amount of pings the remote has to deal with,
hopefully preventing hyper from triggering ENHANCE_YOUR_CALM errors.
  • Loading branch information
seanmonstar authored and Benxiang Ge committed Jul 26, 2021
1 parent 4f704df commit b3e96bf
Showing 1 changed file with 60 additions and 22 deletions.
82 changes: 60 additions & 22 deletions src/proto/h2/ping.rs
Expand Up @@ -51,9 +51,15 @@ pub(super) fn channel(ping_pong: PingPong, config: Config) -> (Recorder, Ponger)
bdp: wnd,
max_bandwidth: 0.0,
rtt: 0.0,
ping_delay: Duration::from_millis(100),
stable_count: 0,
});

let bytes = bdp.as_ref().map(|_| 0);
let (bytes, next_bdp_at) = if bdp.is_some() {
(Some(0), Some(Instant::now()))
} else {
(None, None)
};

#[cfg(feature = "runtime")]
let keep_alive = config.keep_alive_interval.map(|interval| KeepAlive {
Expand All @@ -75,6 +81,7 @@ pub(super) fn channel(ping_pong: PingPong, config: Config) -> (Recorder, Ponger)
is_keep_alive_timed_out: false,
ping_pong,
ping_sent_at: None,
next_bdp_at,
}));

(
Expand Down Expand Up @@ -125,6 +132,9 @@ struct Shared {
/// If `Some`, bdp is enabled, and this tracks how many bytes have been
/// read during the current sample.
bytes: Option<usize>,
/// We delay a variable amount of time between BDP pings. This allows us
/// to send less pings as the bandwidth stabilizes.
next_bdp_at: Option<Instant>,

// keep-alive
/// If `Some`, keep-alive is enabled, and the Instant is how long ago
Expand All @@ -143,6 +153,12 @@ struct Bdp {
max_bandwidth: f64,
/// Round trip time in seconds
rtt: f64,
/// Delay the next ping by this amount.
///
/// This will change depending on how stable the current bandwidth is.
ping_delay: Duration,
/// The count of ping round trips where BDP has stayed the same.
stable_count: u32,
}

#[cfg(feature = "runtime")]
Expand Down Expand Up @@ -207,6 +223,17 @@ impl Recorder {
#[cfg(feature = "runtime")]
locked.update_last_read_at();

// are we ready to send another bdp ping?
// if not, we don't need to record bytes either

if let Some(ref next_bdp_at) = locked.next_bdp_at {
if Instant::now() < *next_bdp_at {
return;
} else {
locked.next_bdp_at = None;
}
}

if let Some(ref mut bytes) = locked.bytes {
*bytes += len;
} else {
Expand Down Expand Up @@ -265,6 +292,7 @@ impl Recorder {

impl Ponger {
pub(super) fn poll(&mut self, cx: &mut task::Context<'_>) -> Poll<Ponged> {
let now = Instant::now();
let mut locked = self.shared.lock().unwrap();
#[cfg(feature = "runtime")]
let is_idle = self.is_idle();
Expand All @@ -282,13 +310,13 @@ impl Ponger {
return Poll::Pending;
}

let (bytes, rtt) = match locked.ping_pong.poll_pong(cx) {
match locked.ping_pong.poll_pong(cx) {
Poll::Ready(Ok(_pong)) => {
let rtt = locked
let start = locked
.ping_sent_at
.expect("pong received implies ping_sent_at")
.elapsed();
.expect("pong received implies ping_sent_at");
locked.ping_sent_at = None;
let rtt = now - start;
trace!("recv pong");

#[cfg(feature = "runtime")]
Expand All @@ -299,19 +327,20 @@ impl Ponger {
}
}

if self.bdp.is_some() {
if let Some(ref mut bdp) = self.bdp {
let bytes = locked.bytes.expect("bdp enabled implies bytes");
locked.bytes = Some(0); // reset
trace!("received BDP ack; bytes = {}, rtt = {:?}", bytes, rtt);
(bytes, rtt)
} else {
// no bdp, done!
return Poll::Pending;

let update = bdp.calculate(bytes, rtt);
locked.next_bdp_at = Some(now + bdp.ping_delay);
if let Some(update) = update {
return Poll::Ready(Ponged::SizeUpdate(update))
}
}
}
Poll::Ready(Err(e)) => {
debug!("pong error: {}", e);
return Poll::Pending;
}
Poll::Pending => {
#[cfg(feature = "runtime")]
Expand All @@ -324,19 +353,11 @@ impl Ponger {
}
}
}

return Poll::Pending;
}
};

drop(locked);

if let Some(bdp) = self.bdp.as_mut().and_then(|bdp| bdp.calculate(bytes, rtt)) {
Poll::Ready(Ponged::SizeUpdate(bdp))
} else {
// XXX: this doesn't register a waker...?
Poll::Pending
}

// XXX: this doesn't register a waker...?
Poll::Pending
}

#[cfg(feature = "runtime")]
Expand Down Expand Up @@ -386,6 +407,7 @@ impl Bdp {
fn calculate(&mut self, bytes: usize, rtt: Duration) -> Option<WindowSize> {
// No need to do any math if we're at the limit.
if self.bdp as usize == BDP_LIMIT {
self.stabilize_delay();
return None;
}

Expand All @@ -405,6 +427,7 @@ impl Bdp {

if bw < self.max_bandwidth {
// not a faster bandwidth, so don't update
self.stabilize_delay();
return None;
} else {
self.max_bandwidth = bw;
Expand All @@ -415,11 +438,26 @@ impl Bdp {
if bytes >= self.bdp as usize * 2 / 3 {
self.bdp = (bytes * 2).min(BDP_LIMIT) as WindowSize;
trace!("BDP increased to {}", self.bdp);

self.stable_count = 0;
self.ping_delay /= 2;
Some(self.bdp)
} else {
self.stabilize_delay();
None
}
}

fn stabilize_delay(&mut self) {
if self.ping_delay < Duration::from_secs(10) {
self.stable_count += 1;

if self.stable_count >= 2 {
self.ping_delay *= 4;
self.stable_count = 0;
}
}
}
}

fn seconds(dur: Duration) -> f64 {
Expand Down

0 comments on commit b3e96bf

Please sign in to comment.