Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamically determine the amount of work that can be performed in an endpoint iteration #1133

Merged
merged 1 commit into from Jun 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 10 additions & 4 deletions quinn/src/endpoint.rs
Expand Up @@ -23,7 +23,8 @@ use crate::{
builders::EndpointBuilder,
connection::Connecting,
platform::{RecvMeta, UdpSocket, BATCH_SIZE},
ConnectionEvent, EndpointEvent, VarInt, IO_LOOP_BOUND,
work_limiter::WorkLimiter,
ConnectionEvent, EndpointEvent, VarInt, IO_LOOP_BOUND, RECV_TIME_BOUND,
};

/// A QUIC endpoint.
Expand Down Expand Up @@ -265,6 +266,7 @@ where
/// Number of live handles that can be used to initiate or handle I/O; excludes the driver
ref_count: usize,
driver_lost: bool,
recv_limiter: WorkLimiter,
recv_buf: Box<[u8]>,
idle: Broadcast,
}
Expand All @@ -274,7 +276,7 @@ where
S: proto::crypto::Session + 'static,
{
fn drive_recv<'a>(&'a mut self, cx: &mut Context, now: Instant) -> Result<bool, io::Error> {
let mut recvd = 0;
self.recv_limiter.start_cycle();
let mut metas = [RecvMeta::default(); BATCH_SIZE];
let mut iovs = MaybeUninit::<[IoSliceMut<'a>; BATCH_SIZE]>::uninit();
self.recv_buf
Expand All @@ -290,7 +292,7 @@ where
loop {
match self.socket.poll_recv(cx, &mut iovs, &mut metas) {
Poll::Ready(Ok(msgs)) => {
recvd += msgs;
self.recv_limiter.record_work(msgs);
for (meta, buf) in metas.iter().zip(iovs.iter()).take(msgs) {
let data = buf[0..meta.len].into();
match self
Expand Down Expand Up @@ -326,10 +328,13 @@ where
return Err(e);
}
}
if recvd >= IO_LOOP_BOUND {
if !self.recv_limiter.allow_work() {
self.recv_limiter.finish_cycle();
return Ok(true);
}
}

self.recv_limiter.finish_cycle();
Ok(false)
}

Expand Down Expand Up @@ -518,6 +523,7 @@ where
ref_count: 0,
driver_lost: false,
recv_buf: recv_buf.into(),
recv_limiter: WorkLimiter::new(RECV_TIME_BOUND),
idle: Broadcast::new(),
})))
}
Expand Down
11 changes: 11 additions & 0 deletions quinn/src/lib.rs
Expand Up @@ -50,6 +50,8 @@ let (endpoint, _) = builder.bind(&\"[::]:0\".parse().unwrap()).unwrap();
//! encryption alone.
#![warn(missing_docs)]

use std::time::Duration;

mod broadcast;
mod builders;
mod connection;
Expand All @@ -58,6 +60,7 @@ mod mutex;
mod platform;
mod recv_stream;
mod send_stream;
mod work_limiter;

pub use proto::{
crypto, ApplicationClose, Certificate, CertificateChain, Chunk, ConfigError, ConnectError,
Expand Down Expand Up @@ -160,3 +163,11 @@ enum EndpointEvent {
/// This helps ensure we don't starve anything when the CPU is slower than the link.
/// Value is selected by picking a low number which didn't degrade throughput in benchmarks.
const IO_LOOP_BOUND: usize = 160;

/// The maximum amount of time that should be spent in `recvmsg()` calls per endpoint iteration
///
/// 50us are chosen so that an endpoint iteration with a 50us sendmsg limit blocks
/// the runtime for a maximum of about 100us.
/// Going much lower does not yield any noticeable difference, since a single `recvmmsg`
/// batch of size 32 was observed to take 30us on some systems.
const RECV_TIME_BOUND: Duration = Duration::from_micros(50);
234 changes: 234 additions & 0 deletions quinn/src/work_limiter.rs
@@ -0,0 +1,234 @@
use std::time::{Duration, Instant};

/// Limits the amount of time spent on a certain type of work in a cycle
///
/// The limiter works dynamically: For a sampled subset of cycles it measures
/// the time that is approximately required for fulfilling 1 work item, and
/// calculates the amount of allowed work items per cycle.
/// The estimates are smoothed over all cycles where the exact duration is measured.
///
/// In cycles where no measurement is performed the previously determined work limit
/// is used.
///
/// For the limiter the exact definition of a work item does not matter.
/// It could for example track the amount of transmitted bytes per cycle,
/// or the amount of transmitted datagrams per cycle.
/// It will however work best if the required time to complete a work item is
/// constant.
#[derive(Debug)]
pub struct WorkLimiter {
/// Whether to measure the required work time, or to use the previous estimates
mode: Mode,
/// The current cycle number
cycle: u16,
/// The time the cycle started - only used in measurement mode
start_time: Instant,
/// How many work items have been completed in the cycle
completed: usize,
/// The amount of work items which are allowed for a cycle
allowed: usize,
/// The desired cycle time
desired_cycle_time: Duration,
/// The estimated and smoothed time per work item in nanoseconds
smoothed_time_per_work_item_nanos: f64,
/// Retrieves the current time for unit-test purposes
#[cfg(test)]
get_time: fn() -> Instant,
Ralith marked this conversation as resolved.
Show resolved Hide resolved
}

impl WorkLimiter {
pub fn new(desired_cycle_time: Duration) -> Self {
Self {
mode: Mode::Measure,
cycle: 0,
start_time: Instant::now(),
completed: 0,
allowed: 0,
desired_cycle_time,
smoothed_time_per_work_item_nanos: 0.0,
#[cfg(test)]
get_time: std::time::Instant::now,
}
}

/// Starts one work cycle
pub fn start_cycle(&mut self) {
self.completed = 0;
if let Mode::Measure = self.mode {
self.start_time = self.now();
}
}

/// Returns whether more work can be performed inside the `desired_cycle_time`
///
/// Requires that previous work was tracked using `record_work`.
pub fn allow_work(&mut self) -> bool {
match self.mode {
Mode::Measure => (self.now() - self.start_time) < self.desired_cycle_time,
Mode::HistoricData => self.completed < self.allowed,
}
}

/// Records that `work` additional work items have been completed inside the cycle
///
/// Must be called between `start_cycle` and `finish_cycle`.
pub fn record_work(&mut self, work: usize) {
self.completed += work;
}

/// Finishes one work cycle
///
/// For cycles where the exact duration is measured this will update the estimates
/// for the time per work item and the limit of allowed work items per cycle.
djc marked this conversation as resolved.
Show resolved Hide resolved
/// The estimate is updated using the same exponential averaging (smoothing)
/// mechanism which is used for determining QUIC path rtts: The last value is
/// weighted by 1/8, and the previous average by 7/8.
pub fn finish_cycle(&mut self) {
// If no work was done in the cycle drop the measurement, it won't be useful
if self.completed == 0 {
return;
}

if let Mode::Measure = self.mode {
let elapsed = self.now() - self.start_time;

let time_per_work_item_nanos = (elapsed.as_nanos()) as f64 / self.completed as f64;

self.smoothed_time_per_work_item_nanos = if self.allowed == 0 {
// Initial estimate
time_per_work_item_nanos
} else {
// Smoothed estimate
(7.0 * self.smoothed_time_per_work_item_nanos + time_per_work_item_nanos) / 8.0
};

self.allowed = ((self.desired_cycle_time.as_nanos()) as f64
/ self.smoothed_time_per_work_item_nanos) as usize;
}

self.cycle = self.cycle.wrapping_add(1);
self.mode = match self.cycle % SAMPLING_INTERVAL {
0 => Mode::Measure,
_ => Mode::HistoricData,
};
}

#[cfg(not(test))]
fn now(&self) -> Instant {
Instant::now()
}

#[cfg(test)]
fn now(&self) -> Instant {
(self.get_time)()
}
}

/// We take a measurement sample once every `SAMPLING_INTERVAL` cycles
const SAMPLING_INTERVAL: u16 = 256;

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum Mode {
Measure,
HistoricData,
}

#[cfg(test)]
mod tests {
use super::*;
use std::cell::RefCell;

#[test]
fn limit_work() {
const CYCLE_TIME: Duration = Duration::from_millis(500);
const BATCH_WORK_ITEMS: usize = 12;
const BATCH_TIME: Duration = Duration::from_millis(100);

const EXPECTED_INITIAL_BATCHES: usize =
(CYCLE_TIME.as_nanos() / BATCH_TIME.as_nanos()) as usize;
const EXPECTED_ALLOWED_WORK_ITEMS: usize = EXPECTED_INITIAL_BATCHES * BATCH_WORK_ITEMS;

let mut limiter = WorkLimiter::new(CYCLE_TIME);
limiter.get_time = get_time;
reset_time();

// The initial cycle is measuring
limiter.start_cycle();
let mut initial_batches = 0;
while limiter.allow_work() {
limiter.record_work(BATCH_WORK_ITEMS);
advance_time(BATCH_TIME);
initial_batches += 1;
}
limiter.finish_cycle();

assert_eq!(initial_batches, EXPECTED_INITIAL_BATCHES);
assert_eq!(limiter.allowed, EXPECTED_ALLOWED_WORK_ITEMS);
let initial_time_per_work_item = limiter.smoothed_time_per_work_item_nanos;

// The next cycles are using historic data
const BATCH_SIZES: [usize; 4] = [1, 2, 3, 5];
for &batch_size in &BATCH_SIZES {
limiter.start_cycle();
let mut allowed_work = 0;
while limiter.allow_work() {
limiter.record_work(batch_size);
allowed_work += batch_size;
}
limiter.finish_cycle();

assert_eq!(allowed_work, EXPECTED_ALLOWED_WORK_ITEMS);
}

// After `SAMPLING_INTERVAL`, we get into measurement mode again
for _ in 0..(SAMPLING_INTERVAL as usize - BATCH_SIZES.len() - 1) {
limiter.start_cycle();
limiter.record_work(1);
limiter.finish_cycle();
}

// We now do more work per cycle, and expect the estimate of allowed
// work items to go up
const BATCH_WORK_ITEMS_2: usize = 96;
const TIME_PER_WORK_ITEMS_2_NANOS: f64 =
CYCLE_TIME.as_nanos() as f64 / (EXPECTED_INITIAL_BATCHES * BATCH_WORK_ITEMS_2) as f64;

let expected_updated_time_per_work_item =
(initial_time_per_work_item * 7.0 + TIME_PER_WORK_ITEMS_2_NANOS) / 8.0;
let expected_updated_allowed_work_items =
(CYCLE_TIME.as_nanos() as f64 / expected_updated_time_per_work_item) as usize;

limiter.start_cycle();
let mut initial_batches = 0;
while limiter.allow_work() {
limiter.record_work(BATCH_WORK_ITEMS_2);
advance_time(BATCH_TIME);
initial_batches += 1;
}
limiter.finish_cycle();

assert_eq!(initial_batches, EXPECTED_INITIAL_BATCHES);
assert_eq!(limiter.allowed, expected_updated_allowed_work_items);
}

thread_local! {
/// Mocked time
pub static TIME: RefCell<Instant> = RefCell::new(Instant::now());
}

fn reset_time() {
TIME.with(|t| {
*t.borrow_mut() = Instant::now();
})
}

fn get_time() -> Instant {
TIME.with(|t| *t.borrow())
}

fn advance_time(duration: Duration) {
TIME.with(|t| {
*t.borrow_mut() += duration;
})
}
}