Skip to content

Commit

Permalink
Add a WorkLimiter, and limit the amount of time sppent in `Endpoint…
Browse files Browse the repository at this point in the history
…::drive_recv` dynamically

This change adds a `WorkLimiter` component, which measures the amount of
time required to perform some work items and will limit work based on time
instead of pure iterations.

It also changes the `Endpoint`s  `drive_recv` method to limit receive operations
based on the amount of spent time (to 50µs) using the `WorkLimiter`, instead
of using the hardcoded `IO_LOOP_BOUND` counter.

Performance differences are negligible on this machine (probably because
`IO_LOOP_BOUND` was set to a number which works for it), but it can improve
things on less known environments.

I instrumented the endpoints receive method to see how much time it spends
on average in `drive_recv`.

**Baseline:**
```
Recv time: AvgTime { total: 3.280880841s, calls: 34559, avg: 94.935µs, min: 3.146µs, max: 312.574µs }
path: PathStats {
    rtt: 511.656µs,
```

**With this change:**
```
Recv time: AvgTime { total: 3.333642823s, calls: 54627, avg: 61.024µs, min: 2.645µs, max: 319.147µs }
path: PathStats {
    rtt: 446.641µs,
```
Note that 50µs are not reached because a single `recvmmsg` batch takes about 30µs, so this is just rounding up to 2 batches.

**When set to 200µs (for comparison purposes):**
```
Recv time: AvgTime { total: 3.243954076s, calls: 19558, avg: 165.862µs, min: 2.525µs, max: 358.711µs }
path: PathStats {
    rtt: 700.34µs,
}
```
  • Loading branch information
Matthias247 committed May 28, 2021
1 parent 5ab895b commit e32fa0d
Show file tree
Hide file tree
Showing 3 changed files with 249 additions and 4 deletions.
14 changes: 10 additions & 4 deletions quinn/src/endpoint.rs
Expand Up @@ -23,7 +23,8 @@ use crate::{
builders::EndpointBuilder,
connection::Connecting,
platform::{RecvMeta, UdpSocket, BATCH_SIZE},
ConnectionEvent, EndpointEvent, VarInt, IO_LOOP_BOUND,
work_limiter::WorkLimiter,
ConnectionEvent, EndpointEvent, VarInt, IO_LOOP_BOUND, RECV_TIME_BOUND,
};

/// A QUIC endpoint.
Expand Down Expand Up @@ -265,6 +266,7 @@ where
/// Number of live handles that can be used to initiate or handle I/O; excludes the driver
ref_count: usize,
driver_lost: bool,
recv_limiter: WorkLimiter,
recv_buf: Box<[u8]>,
idle: Broadcast,
}
Expand All @@ -274,7 +276,7 @@ where
S: proto::crypto::Session + 'static,
{
fn drive_recv<'a>(&'a mut self, cx: &mut Context, now: Instant) -> Result<bool, io::Error> {
let mut recvd = 0;
self.recv_limiter.start_cycle();
let mut metas = [RecvMeta::default(); BATCH_SIZE];
let mut iovs = MaybeUninit::<[IoSliceMut<'a>; BATCH_SIZE]>::uninit();
self.recv_buf
Expand All @@ -290,7 +292,7 @@ where
loop {
match self.socket.poll_recv(cx, &mut iovs, &mut metas) {
Poll::Ready(Ok(msgs)) => {
recvd += msgs;
self.recv_limiter.record_work(msgs);
for (meta, buf) in metas.iter().zip(iovs.iter()).take(msgs) {
let data = buf[0..meta.len].into();
match self
Expand Down Expand Up @@ -326,10 +328,13 @@ where
return Err(e);
}
}
if recvd >= IO_LOOP_BOUND {
if !self.recv_limiter.allow_work() {
self.recv_limiter.finish_cycle();
return Ok(true);
}
}

self.recv_limiter.finish_cycle();
Ok(false)
}

Expand Down Expand Up @@ -518,6 +523,7 @@ where
ref_count: 0,
driver_lost: false,
recv_buf: recv_buf.into(),
recv_limiter: WorkLimiter::new(RECV_TIME_BOUND),
idle: Broadcast::new(),
})))
}
Expand Down
11 changes: 11 additions & 0 deletions quinn/src/lib.rs
Expand Up @@ -50,6 +50,8 @@ let (endpoint, _) = builder.bind(&\"[::]:0\".parse().unwrap()).unwrap();
//! encryption alone.
#![warn(missing_docs)]

use std::time::Duration;

mod broadcast;
mod builders;
mod connection;
Expand All @@ -58,6 +60,7 @@ mod mutex;
mod platform;
mod recv_stream;
mod send_stream;
mod work_limiter;

pub use proto::{
crypto, ApplicationClose, Certificate, CertificateChain, Chunk, ConfigError, ConnectError,
Expand Down Expand Up @@ -160,3 +163,11 @@ enum EndpointEvent {
/// This helps ensure we don't starve anything when the CPU is slower than the link.
/// Value is selected by picking a low number which didn't degrade throughput in benchmarks.
const IO_LOOP_BOUND: usize = 160;

/// The maximum amount of time that should be spent in `recvmsg()` calls per endpoint iteration
///
/// 50us are chosen so that an endpoint iteration with a 50us sendmsg limit blocks
/// the runtime for a maximum of about 100us.
/// Going much lower does not yield any noticeable difference, since a single `recvmmsg`
/// batch of size 32 was observed to take 30us on some systems.
const RECV_TIME_BOUND: Duration = Duration::from_micros(50);
228 changes: 228 additions & 0 deletions quinn/src/work_limiter.rs
@@ -0,0 +1,228 @@
use std::time::{Duration, Instant};

/// Limits the amount of time spent on a certain type of work in a cycle
///
/// The limiter works dynamically: For a sampled subset of cycles it measures
/// the time that is approximately required for fulfilling 1 work item, and
/// calculates the amount of allowed work items per cycle.
/// The estimates are smoothed over all cycles where the exact duration is measured.
///
/// In cycles where no measurement is performed the previously determined work limit
/// is used.
///
/// For the limiter the exact definition of a work item does not matter.
/// It could for example track the amount of transmitted bytes per cycle,
/// or the amount of transmitted datagrams per cycle.
/// It will however work best if the required time to complete a work item is
/// constant.
#[derive(Debug)]
pub struct WorkLimiter {
/// Whether to measure the required work time, or to use the previous estimates
mode: Mode,
/// The current cycle number
cycle: u16,
/// The time the cycle started - only used in measurement mode
start_time: Instant,
/// How many work items have been completed in the cycle
completed: usize,
/// The amount of work items which are allowed for a cycle
allowed: usize,
/// The desired cycle time
desired_cycle_time: Duration,
/// The estimated and smoothed time per work item in nanoseconds
smoothed_time_per_work_item_nanos: f64,
}

impl WorkLimiter {
pub fn new(desired_cycle_time: Duration) -> Self {
Self {
mode: Mode::Measure,
cycle: 0,
start_time: Instant::now(),
completed: 0,
allowed: 0,
desired_cycle_time,
smoothed_time_per_work_item_nanos: 0.0,
}
}

/// Starts one work cycle
pub fn start_cycle(&mut self) {
self.completed = 0;
if let Mode::Measure = self.mode {
self.start_time = Instant::now();
}
}

/// Returns whether more work can be performed inside the `desired_cycle_time`
///
/// Requires that previous work was tracked using `record_work`.
pub fn allow_work(&mut self) -> bool {
match self.mode {
Mode::Measure => self.start_time.elapsed() < self.desired_cycle_time,
Mode::HistoricData => self.completed < self.allowed,
}
}

/// Records that `work` additional work items have been completed inside the cycle
///
/// Must be called between `start_cycle` and `finish_cycle`.
pub fn record_work(&mut self, work: usize) {
self.completed += work;
}

/// Finishes one work cycle
///
/// For cycles where the exact duration is measured this will update the estimates
/// for the time per work item and the limit of allowed work items per cycle.
/// The estimate is updated using the same exponential averaging (smoothing)
/// mechanism which is used for determining QUIC path rtts: The last value is
/// weighted by 1/8, and the previous average by 7/8.
pub fn finish_cycle(&mut self) {
// If no work was done in the cycle drop the measurement, it won't be useful
if self.completed == 0 {
return;
}

if let Mode::Measure = self.mode {
let elapsed = self.start_time.elapsed();

let time_per_work_item_nanos = (elapsed.as_nanos()) as f64 / self.completed as f64;

self.smoothed_time_per_work_item_nanos = if self.allowed == 0 {
// Initial estimate
time_per_work_item_nanos
} else {
// Smoothed estimate
(7.0 * self.smoothed_time_per_work_item_nanos + time_per_work_item_nanos) / 8.0
};

self.allowed = ((self.desired_cycle_time.as_nanos()) as f64
/ self.smoothed_time_per_work_item_nanos) as usize;
}

self.cycle = self.cycle.wrapping_add(1);
self.mode = match self.cycle % SAMPLING_INTERVAL {
0 => Mode::Measure,
_ => Mode::HistoricData,
};
}
}

/// We take a measurement sample once every `SAMPLING_INTERVAL` cycles
const SAMPLING_INTERVAL: u16 = 256;

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum Mode {
Measure,
HistoricData,
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn limit_work() {
const CYCLE_TIME: Duration = Duration::from_millis(500);
const BATCH_WORK_ITEMS: usize = 12;
const BATCH_TIME: Duration = Duration::from_millis(100);

const EXPECTED_INITIAL_BATCHES: usize =
(CYCLE_TIME.as_nanos() / BATCH_TIME.as_nanos()) as usize;
const EXPECTED_ALLOWED_WORK_ITEMS: usize = EXPECTED_INITIAL_BATCHES * BATCH_WORK_ITEMS;

let mut limiter = WorkLimiter::new(CYCLE_TIME);

// The initial cycle is measuring
limiter.start_cycle();
let mut initial_batches = 0;
while limiter.allow_work() {
limiter.record_work(BATCH_WORK_ITEMS);
std::thread::sleep(BATCH_TIME);
initial_batches += 1;
}
limiter.finish_cycle();

assert!(
approximates(initial_batches, EXPECTED_INITIAL_BATCHES),
"Expected {} allowed initial batches, but {} had been performed",
EXPECTED_INITIAL_BATCHES,
initial_batches
);

assert!(
limiter.allowed >= 3 * EXPECTED_ALLOWED_WORK_ITEMS / 4
&& limiter.allowed <= 5 * EXPECTED_ALLOWED_WORK_ITEMS / 4,
"Expected {} allowed work items, but {} are allowed",
EXPECTED_ALLOWED_WORK_ITEMS,
limiter.allowed
);
let initial_time_per_work_item = limiter.smoothed_time_per_work_item_nanos;

// The next cycles are using historic data
const BATCH_SIZES: [usize; 4] = [1, 2, 3, 5];
for &batch_size in &BATCH_SIZES {
limiter.start_cycle();
let mut allowed_work = 0;
while limiter.allow_work() {
limiter.record_work(batch_size);
allowed_work += batch_size;
}
limiter.finish_cycle();

assert!(
approximates(allowed_work, EXPECTED_ALLOWED_WORK_ITEMS),
"Expected {} allowed work items, but {} are allowed",
EXPECTED_ALLOWED_WORK_ITEMS,
allowed_work
);
}

// After `SAMPLING_INTERVAL`, we get into measurement mode again
for _ in 0..(SAMPLING_INTERVAL as usize - BATCH_SIZES.len() - 1) {
limiter.start_cycle();
limiter.record_work(1);
limiter.finish_cycle();
}

// We now do more work per cycle, and expect the estimate of allowed
// work items to go up
const BATCH_WORK_ITEMS_2: usize = 96;
const TIME_PER_WORK_ITEMS_2_NANOS: f64 =
CYCLE_TIME.as_nanos() as f64 / (EXPECTED_INITIAL_BATCHES * BATCH_WORK_ITEMS_2) as f64;

let expected_updated_time_per_work_item =
(initial_time_per_work_item * 7.0 + TIME_PER_WORK_ITEMS_2_NANOS) / 8.0;
let expected_updated_allowed_work_items =
(CYCLE_TIME.as_nanos() as f64 / expected_updated_time_per_work_item) as usize;

limiter.start_cycle();
let mut initial_batches = 0;
while limiter.allow_work() {
limiter.record_work(BATCH_WORK_ITEMS_2);
std::thread::sleep(BATCH_TIME);
initial_batches += 1;
}
limiter.finish_cycle();

assert!(
approximates(initial_batches, EXPECTED_INITIAL_BATCHES),
"Expected {} allowed initial batches, but {} had been performed",
EXPECTED_INITIAL_BATCHES,
initial_batches
);

assert!(
approximates(limiter.allowed, expected_updated_allowed_work_items),
"Expected {} allowed work items, but {} are allowed",
expected_updated_allowed_work_items,
limiter.allowed
);
}

/// Checks whether a and b are approximately the same (33% error rate)
fn approximates(a: usize, b: usize) -> bool {
a >= b * 2 / 3 && a <= b * 4 / 3
}
}

0 comments on commit e32fa0d

Please sign in to comment.