Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(http2): reduce adaptive window pings as the BDP stabilizes #2550

Merged
merged 1 commit into from May 18, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
82 changes: 60 additions & 22 deletions src/proto/h2/ping.rs
Expand Up @@ -51,9 +51,15 @@ pub(super) fn channel(ping_pong: PingPong, config: Config) -> (Recorder, Ponger)
bdp: wnd,
max_bandwidth: 0.0,
rtt: 0.0,
ping_delay: Duration::from_millis(100),
stable_count: 0,
});

let bytes = bdp.as_ref().map(|_| 0);
let (bytes, next_bdp_at) = if bdp.is_some() {
(Some(0), Some(Instant::now()))
} else {
(None, None)
};

#[cfg(feature = "runtime")]
let keep_alive = config.keep_alive_interval.map(|interval| KeepAlive {
Expand All @@ -75,6 +81,7 @@ pub(super) fn channel(ping_pong: PingPong, config: Config) -> (Recorder, Ponger)
is_keep_alive_timed_out: false,
ping_pong,
ping_sent_at: None,
next_bdp_at,
}));

(
Expand Down Expand Up @@ -125,6 +132,9 @@ struct Shared {
/// If `Some`, bdp is enabled, and this tracks how many bytes have been
/// read during the current sample.
bytes: Option<usize>,
/// We delay a variable amount of time between BDP pings. This allows us
/// to send less pings as the bandwidth stabilizes.
next_bdp_at: Option<Instant>,

// keep-alive
/// If `Some`, keep-alive is enabled, and the Instant is how long ago
Expand All @@ -143,6 +153,12 @@ struct Bdp {
max_bandwidth: f64,
/// Round trip time in seconds
rtt: f64,
/// Delay the next ping by this amount.
///
/// This will change depending on how stable the current bandwidth is.
ping_delay: Duration,
/// The count of ping round trips where BDP has stayed the same.
stable_count: u32,
}

#[cfg(feature = "runtime")]
Expand Down Expand Up @@ -207,6 +223,17 @@ impl Recorder {
#[cfg(feature = "runtime")]
locked.update_last_read_at();

// are we ready to send another bdp ping?
// if not, we don't need to record bytes either

if let Some(ref next_bdp_at) = locked.next_bdp_at {
if Instant::now() < *next_bdp_at {
return;
} else {
locked.next_bdp_at = None;
}
}

if let Some(ref mut bytes) = locked.bytes {
*bytes += len;
} else {
Expand Down Expand Up @@ -265,6 +292,7 @@ impl Recorder {

impl Ponger {
pub(super) fn poll(&mut self, cx: &mut task::Context<'_>) -> Poll<Ponged> {
let now = Instant::now();
let mut locked = self.shared.lock().unwrap();
#[cfg(feature = "runtime")]
let is_idle = self.is_idle();
Expand All @@ -282,13 +310,13 @@ impl Ponger {
return Poll::Pending;
}

let (bytes, rtt) = match locked.ping_pong.poll_pong(cx) {
match locked.ping_pong.poll_pong(cx) {
Poll::Ready(Ok(_pong)) => {
let rtt = locked
let start = locked
.ping_sent_at
.expect("pong received implies ping_sent_at")
.elapsed();
.expect("pong received implies ping_sent_at");
seanmonstar marked this conversation as resolved.
Show resolved Hide resolved
locked.ping_sent_at = None;
let rtt = now - start;
trace!("recv pong");

#[cfg(feature = "runtime")]
Expand All @@ -299,19 +327,20 @@ impl Ponger {
}
}

if self.bdp.is_some() {
if let Some(ref mut bdp) = self.bdp {
let bytes = locked.bytes.expect("bdp enabled implies bytes");
locked.bytes = Some(0); // reset
trace!("received BDP ack; bytes = {}, rtt = {:?}", bytes, rtt);
(bytes, rtt)
} else {
// no bdp, done!
return Poll::Pending;

let update = bdp.calculate(bytes, rtt);
locked.next_bdp_at = Some(now + bdp.ping_delay);
if let Some(update) = update {
return Poll::Ready(Ponged::SizeUpdate(update))
}
}
}
Poll::Ready(Err(e)) => {
debug!("pong error: {}", e);
return Poll::Pending;
}
Poll::Pending => {
#[cfg(feature = "runtime")]
Expand All @@ -324,19 +353,11 @@ impl Ponger {
}
}
}

return Poll::Pending;
}
};

drop(locked);

if let Some(bdp) = self.bdp.as_mut().and_then(|bdp| bdp.calculate(bytes, rtt)) {
Poll::Ready(Ponged::SizeUpdate(bdp))
} else {
// XXX: this doesn't register a waker...?
Poll::Pending
}

// XXX: this doesn't register a waker...?
Poll::Pending
}

#[cfg(feature = "runtime")]
Expand Down Expand Up @@ -386,6 +407,7 @@ impl Bdp {
fn calculate(&mut self, bytes: usize, rtt: Duration) -> Option<WindowSize> {
// No need to do any math if we're at the limit.
if self.bdp as usize == BDP_LIMIT {
self.stabilize_delay();
return None;
}

Expand All @@ -405,6 +427,7 @@ impl Bdp {

if bw < self.max_bandwidth {
// not a faster bandwidth, so don't update
self.stabilize_delay();
return None;
} else {
self.max_bandwidth = bw;
Expand All @@ -415,11 +438,26 @@ impl Bdp {
if bytes >= self.bdp as usize * 2 / 3 {
self.bdp = (bytes * 2).min(BDP_LIMIT) as WindowSize;
trace!("BDP increased to {}", self.bdp);

self.stable_count = 0;
self.ping_delay /= 2;
Some(self.bdp)
} else {
self.stabilize_delay();
None
}
}

fn stabilize_delay(&mut self) {
if self.ping_delay < Duration::from_secs(10) {
self.stable_count += 1;

if self.stable_count >= 2 {
self.ping_delay *= 4;
self.stable_count = 0;
}
}
}
}

fn seconds(dur: Duration) -> f64 {
Expand Down