diff --git a/quinn/src/endpoint.rs b/quinn/src/endpoint.rs index 549040ff1..3a0b5b6c6 100644 --- a/quinn/src/endpoint.rs +++ b/quinn/src/endpoint.rs @@ -23,7 +23,8 @@ use crate::{ builders::EndpointBuilder, connection::Connecting, platform::{RecvMeta, UdpSocket, BATCH_SIZE}, - ConnectionEvent, EndpointEvent, VarInt, IO_LOOP_BOUND, + work_limiter::WorkLimiter, + ConnectionEvent, EndpointEvent, VarInt, IO_LOOP_BOUND, RECV_TIME_BOUND, }; /// A QUIC endpoint. @@ -265,6 +266,7 @@ where /// Number of live handles that can be used to initiate or handle I/O; excludes the driver ref_count: usize, driver_lost: bool, + recv_limiter: WorkLimiter, recv_buf: Box<[u8]>, idle: Broadcast, } @@ -274,7 +276,7 @@ where S: proto::crypto::Session + 'static, { fn drive_recv<'a>(&'a mut self, cx: &mut Context, now: Instant) -> Result { - let mut recvd = 0; + self.recv_limiter.start_cycle(); let mut metas = [RecvMeta::default(); BATCH_SIZE]; let mut iovs = MaybeUninit::<[IoSliceMut<'a>; BATCH_SIZE]>::uninit(); self.recv_buf @@ -290,7 +292,7 @@ where loop { match self.socket.poll_recv(cx, &mut iovs, &mut metas) { Poll::Ready(Ok(msgs)) => { - recvd += msgs; + self.recv_limiter.record_work(msgs); for (meta, buf) in metas.iter().zip(iovs.iter()).take(msgs) { let data = buf[0..meta.len].into(); match self @@ -326,10 +328,13 @@ where return Err(e); } } - if recvd >= IO_LOOP_BOUND { + if !self.recv_limiter.allow_work() { + self.recv_limiter.finish_cycle(); return Ok(true); } } + + self.recv_limiter.finish_cycle(); Ok(false) } @@ -518,6 +523,7 @@ where ref_count: 0, driver_lost: false, recv_buf: recv_buf.into(), + recv_limiter: WorkLimiter::new(RECV_TIME_BOUND), idle: Broadcast::new(), }))) } diff --git a/quinn/src/lib.rs b/quinn/src/lib.rs index 4720e9876..7e676df04 100644 --- a/quinn/src/lib.rs +++ b/quinn/src/lib.rs @@ -50,6 +50,8 @@ let (endpoint, _) = builder.bind(&\"[::]:0\".parse().unwrap()).unwrap(); //! encryption alone. #![warn(missing_docs)] +use std::time::Duration; + mod broadcast; mod builders; mod connection; @@ -58,6 +60,7 @@ mod mutex; mod platform; mod recv_stream; mod send_stream; +mod work_limiter; pub use proto::{ crypto, ApplicationClose, Certificate, CertificateChain, Chunk, ConfigError, ConnectError, @@ -160,3 +163,11 @@ enum EndpointEvent { /// This helps ensure we don't starve anything when the CPU is slower than the link. /// Value is selected by picking a low number which didn't degrade throughput in benchmarks. const IO_LOOP_BOUND: usize = 160; + +/// The maximum amount of time that should be spent in `recvmsg()` calls per endpoint iteration +/// +/// 50us are chosen so that an endpoint iteration with a 50us sendmsg limit blocks +/// the runtime for a maximum of about 100us. +/// Going much lower does not yield any noticeable difference, since a single `recvmmsg` +/// batch of size 32 was observed to take 30us on some systems. +const RECV_TIME_BOUND: Duration = Duration::from_micros(50); diff --git a/quinn/src/work_limiter.rs b/quinn/src/work_limiter.rs new file mode 100644 index 000000000..4264de758 --- /dev/null +++ b/quinn/src/work_limiter.rs @@ -0,0 +1,234 @@ +use std::time::{Duration, Instant}; + +/// Limits the amount of time spent on a certain type of work in a cycle +/// +/// The limiter works dynamically: For a sampled subset of cycles it measures +/// the time that is approximately required for fulfilling 1 work item, and +/// calculates the amount of allowed work items per cycle. +/// The estimates are smoothed over all cycles where the exact duration is measured. +/// +/// In cycles where no measurement is performed the previously determined work limit +/// is used. +/// +/// For the limiter the exact definition of a work item does not matter. +/// It could for example track the amount of transmitted bytes per cycle, +/// or the amount of transmitted datagrams per cycle. +/// It will however work best if the required time to complete a work item is +/// constant. +#[derive(Debug)] +pub struct WorkLimiter { + /// Whether to measure the required work time, or to use the previous estimates + mode: Mode, + /// The current cycle number + cycle: u16, + /// The time the cycle started - only used in measurement mode + start_time: Instant, + /// How many work items have been completed in the cycle + completed: usize, + /// The amount of work items which are allowed for a cycle + allowed: usize, + /// The desired cycle time + desired_cycle_time: Duration, + /// The estimated and smoothed time per work item in nanoseconds + smoothed_time_per_work_item_nanos: f64, + /// Retrieves the current time for unit-test purposes + #[cfg(test)] + get_time: fn() -> Instant, +} + +impl WorkLimiter { + pub fn new(desired_cycle_time: Duration) -> Self { + Self { + mode: Mode::Measure, + cycle: 0, + start_time: Instant::now(), + completed: 0, + allowed: 0, + desired_cycle_time, + smoothed_time_per_work_item_nanos: 0.0, + #[cfg(test)] + get_time: std::time::Instant::now, + } + } + + /// Starts one work cycle + pub fn start_cycle(&mut self) { + self.completed = 0; + if let Mode::Measure = self.mode { + self.start_time = self.now(); + } + } + + /// Returns whether more work can be performed inside the `desired_cycle_time` + /// + /// Requires that previous work was tracked using `record_work`. + pub fn allow_work(&mut self) -> bool { + match self.mode { + Mode::Measure => (self.now() - self.start_time) < self.desired_cycle_time, + Mode::HistoricData => self.completed < self.allowed, + } + } + + /// Records that `work` additional work items have been completed inside the cycle + /// + /// Must be called between `start_cycle` and `finish_cycle`. + pub fn record_work(&mut self, work: usize) { + self.completed += work; + } + + /// Finishes one work cycle + /// + /// For cycles where the exact duration is measured this will update the estimates + /// for the time per work item and the limit of allowed work items per cycle. + /// The estimate is updated using the same exponential averaging (smoothing) + /// mechanism which is used for determining QUIC path rtts: The last value is + /// weighted by 1/8, and the previous average by 7/8. + pub fn finish_cycle(&mut self) { + // If no work was done in the cycle drop the measurement, it won't be useful + if self.completed == 0 { + return; + } + + if let Mode::Measure = self.mode { + let elapsed = self.now() - self.start_time; + + let time_per_work_item_nanos = (elapsed.as_nanos()) as f64 / self.completed as f64; + + self.smoothed_time_per_work_item_nanos = if self.allowed == 0 { + // Initial estimate + time_per_work_item_nanos + } else { + // Smoothed estimate + (7.0 * self.smoothed_time_per_work_item_nanos + time_per_work_item_nanos) / 8.0 + }; + + self.allowed = ((self.desired_cycle_time.as_nanos()) as f64 + / self.smoothed_time_per_work_item_nanos) as usize; + } + + self.cycle = self.cycle.wrapping_add(1); + self.mode = match self.cycle % SAMPLING_INTERVAL { + 0 => Mode::Measure, + _ => Mode::HistoricData, + }; + } + + #[cfg(not(test))] + fn now(&self) -> Instant { + Instant::now() + } + + #[cfg(test)] + fn now(&self) -> Instant { + (self.get_time)() + } +} + +/// We take a measurement sample once every `SAMPLING_INTERVAL` cycles +const SAMPLING_INTERVAL: u16 = 256; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum Mode { + Measure, + HistoricData, +} + +#[cfg(test)] +mod tests { + use super::*; + use std::cell::RefCell; + + #[test] + fn limit_work() { + const CYCLE_TIME: Duration = Duration::from_millis(500); + const BATCH_WORK_ITEMS: usize = 12; + const BATCH_TIME: Duration = Duration::from_millis(100); + + const EXPECTED_INITIAL_BATCHES: usize = + (CYCLE_TIME.as_nanos() / BATCH_TIME.as_nanos()) as usize; + const EXPECTED_ALLOWED_WORK_ITEMS: usize = EXPECTED_INITIAL_BATCHES * BATCH_WORK_ITEMS; + + let mut limiter = WorkLimiter::new(CYCLE_TIME); + limiter.get_time = get_time; + reset_time(); + + // The initial cycle is measuring + limiter.start_cycle(); + let mut initial_batches = 0; + while limiter.allow_work() { + limiter.record_work(BATCH_WORK_ITEMS); + advance_time(BATCH_TIME); + initial_batches += 1; + } + limiter.finish_cycle(); + + assert_eq!(initial_batches, EXPECTED_INITIAL_BATCHES); + assert_eq!(limiter.allowed, EXPECTED_ALLOWED_WORK_ITEMS); + let initial_time_per_work_item = limiter.smoothed_time_per_work_item_nanos; + + // The next cycles are using historic data + const BATCH_SIZES: [usize; 4] = [1, 2, 3, 5]; + for &batch_size in &BATCH_SIZES { + limiter.start_cycle(); + let mut allowed_work = 0; + while limiter.allow_work() { + limiter.record_work(batch_size); + allowed_work += batch_size; + } + limiter.finish_cycle(); + + assert_eq!(allowed_work, EXPECTED_ALLOWED_WORK_ITEMS); + } + + // After `SAMPLING_INTERVAL`, we get into measurement mode again + for _ in 0..(SAMPLING_INTERVAL as usize - BATCH_SIZES.len() - 1) { + limiter.start_cycle(); + limiter.record_work(1); + limiter.finish_cycle(); + } + + // We now do more work per cycle, and expect the estimate of allowed + // work items to go up + const BATCH_WORK_ITEMS_2: usize = 96; + const TIME_PER_WORK_ITEMS_2_NANOS: f64 = + CYCLE_TIME.as_nanos() as f64 / (EXPECTED_INITIAL_BATCHES * BATCH_WORK_ITEMS_2) as f64; + + let expected_updated_time_per_work_item = + (initial_time_per_work_item * 7.0 + TIME_PER_WORK_ITEMS_2_NANOS) / 8.0; + let expected_updated_allowed_work_items = + (CYCLE_TIME.as_nanos() as f64 / expected_updated_time_per_work_item) as usize; + + limiter.start_cycle(); + let mut initial_batches = 0; + while limiter.allow_work() { + limiter.record_work(BATCH_WORK_ITEMS_2); + advance_time(BATCH_TIME); + initial_batches += 1; + } + limiter.finish_cycle(); + + assert_eq!(initial_batches, EXPECTED_INITIAL_BATCHES); + assert_eq!(limiter.allowed, expected_updated_allowed_work_items); + } + + thread_local! { + /// Mocked time + pub static TIME: RefCell = RefCell::new(Instant::now()); + } + + fn reset_time() { + TIME.with(|t| { + *t.borrow_mut() = Instant::now(); + }) + } + + fn get_time() -> Instant { + TIME.with(|t| *t.borrow()) + } + + fn advance_time(duration: Duration) { + TIME.with(|t| { + *t.borrow_mut() += duration; + }) + } +}