Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't update remote client CIDs gratuitously #1294

Merged
merged 8 commits into from
Feb 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
165 changes: 103 additions & 62 deletions quinn-proto/src/cid_queue.rs
@@ -1,6 +1,6 @@
use std::ops::Range;

use crate::{shared::IssuedCid, ConnectionId, ResetToken};
use crate::{frame::NewConnectionId, ConnectionId, ResetToken};

/// DataType stored in CidQueue buffer
type CidData = (ConnectionId, Option<ResetToken>);
Expand Down Expand Up @@ -31,60 +31,83 @@ impl CidQueue {
}
}

pub fn insert(&mut self, cid: IssuedCid) -> Result<(), InsertError> {
if cid.sequence == self.offset && self.buffer[self.cursor].is_some() {
// This is a duplicate of the active CID.
return Ok(());
}
let index = cid
.sequence
.checked_sub(self.offset)
.ok_or(InsertError::Retired)?;
if index >= Self::LEN as u64 {
/// Handle a `NEW_CONNECTION_ID` frame
///
/// Returns a non-empty range of retired sequence numbers and the reset token of the new active
/// CID iff any CIDs were retired.
pub fn insert(
&mut self,
cid: NewConnectionId,
) -> Result<Option<(Range<u64>, ResetToken)>, InsertError> {
// Position of new CID wrt. the current active CID
let index = match cid.sequence.checked_sub(self.offset) {
None => return Err(InsertError::Retired),
Some(x) => x,
};

let retired_count = cid.retire_prior_to.saturating_sub(self.offset);
Ralith marked this conversation as resolved.
Show resolved Hide resolved
if index >= Self::LEN as u64 + retired_count {
return Err(InsertError::ExceedsLimit);
}
let index = (self.cursor + index as usize) % Self::LEN;
self.buffer[index] = Some((cid.id, Some(cid.reset_token)));
Ok(())
}

/// Returns the possibly-empty range of newly retired CIDs
pub fn retire_prior_to(&mut self, sequence: u64) -> Range<u64> {
let n = match sequence.checked_sub(self.offset) {
None => return 0..0,
Some(n) => n as usize,
};
for i in 0..n {
// Discard retired CIDs, if any
for i in 0..(retired_count.min(Self::LEN as u64) as usize) {
self.buffer[(self.cursor + i) % Self::LEN] = None;
}

// Record the new CID
let index = ((self.cursor as u64 + index) % Self::LEN as u64) as usize;
self.buffer[index] = Some((cid.id, Some(cid.reset_token)));

if retired_count == 0 {
return Ok(None);
}

// The active CID was retired. Find the first known CID with sequence number of at least
Ralith marked this conversation as resolved.
Show resolved Hide resolved
// retire_prior_to, and inform the caller that all prior CIDs have been retired, and of
// the new CID's reset token.
self.cursor = ((self.cursor as u64 + retired_count) % Self::LEN as u64) as usize;
let (i, (_, token)) = self
.iter()
.next()
.expect("it is impossible to retire a CID without supplying a new one");
self.cursor = (self.cursor + i) % Self::LEN;
let orig_offset = self.offset;
self.offset = sequence;
self.cursor = (self.cursor + n) % Self::LEN;
orig_offset..sequence
self.offset = cid.retire_prior_to + i as u64;
Ralith marked this conversation as resolved.
Show resolved Hide resolved
// We don't immediately retire CIDs in the range (orig_offset +
// Self::LEN)..self.offset. These are CIDs that we haven't yet received from a
// NEW_CONNECTION_ID frame, since having previously received them would violate the
// connection ID limit we specified based on Self::LEN. If we do receive a such a frame
// in the future, e.g. due to reordering, we'll retire it then. This ensures we can't be
// made to buffer an arbitrarily large number of RETIRE_CONNECTION_ID frames.
Ok(Some((
orig_offset..self.offset.min(orig_offset + Self::LEN as u64),
token.expect("non-initial CID missing reset token"),
)))
}

/// Switch to next active CID if possible, return
/// 1) the corresponding ResetToken and 2) a possibly-empty range preceding it to retire
/// 1) the corresponding ResetToken and 2) a non-empty range preceding it to retire
pub fn next(&mut self) -> Option<(ResetToken, Range<u64>)> {
let (i, cid_data) = self.iter().next()?;
let (i, cid_data) = self.iter().nth(1)?;
self.buffer[self.cursor] = None;

let orig_offset = self.offset;
self.offset += i as u64;
self.cursor = (self.cursor + i) % Self::LEN;
let sequence = orig_offset + i as u64;
Some((cid_data.1.unwrap(), orig_offset..sequence))
Some((cid_data.1.unwrap(), orig_offset..self.offset))
}

/// Iterate inactive CIDs in CidQueue that are not `None`
/// Iterate CIDs in CidQueue that are not `None`, including the active CID
fn iter(&self) -> impl Iterator<Item = (usize, CidData)> + '_ {
(1..Self::LEN).filter_map(move |step| {
(0..Self::LEN).filter_map(move |step| {
let index = (self.cursor + step) % Self::LEN;
self.buffer[index].map(|cid_data| (step, cid_data))
})
}

pub fn update_cid(&mut self, cid: ConnectionId) {
/// Replace the initial CID
pub fn update_initial_cid(&mut self, cid: ConnectionId) {
debug_assert_eq!(self.offset, 0);
self.buffer[self.cursor] = Some((cid, None));
}
Expand All @@ -94,11 +117,6 @@ impl CidQueue {
self.buffer[self.cursor].unwrap().0
}

/// Check whether self.offset points to a valid CID in CidQueue
pub fn is_active_retired(&mut self) -> bool {
self.buffer[self.cursor].is_none()
}

/// Return the sequence number of active remote CID
pub fn active_seq(&self) -> u64 {
self.offset
Expand All @@ -119,11 +137,12 @@ pub enum InsertError {
mod tests {
use super::*;

fn cid(sequence: u64) -> IssuedCid {
IssuedCid {
fn cid(sequence: u64, retire_prior_to: u64) -> NewConnectionId {
NewConnectionId {
sequence,
id: ConnectionId::new(&[0xAB; 8]),
reset_token: ResetToken::from([0xCD; crate::RESET_TOKEN_SIZE]),
retire_prior_to,
}
}

Expand All @@ -138,7 +157,7 @@ mod tests {
assert!(q.next().is_none());

for i in 1..CidQueue::LEN as u64 {
q.insert(cid(i)).unwrap();
q.insert(cid(i, 0)).unwrap();
}
for i in 1..CidQueue::LEN as u64 {
let (_, retire) = q.next().unwrap();
Expand All @@ -152,7 +171,7 @@ mod tests {
let mut q = CidQueue::new(initial_cid());
let seqs = (1..CidQueue::LEN as u64).filter(|x| x % 2 == 0);
for i in seqs.clone() {
q.insert(cid(i)).unwrap();
q.insert(cid(i, 0)).unwrap();
}
for i in seqs {
let (_, retire) = q.next().unwrap();
Expand All @@ -168,13 +187,13 @@ mod tests {
let mut q = CidQueue::new(initial_cid());

for i in 1..CidQueue::LEN as u64 {
q.insert(cid(i)).unwrap();
q.insert(cid(i, 0)).unwrap();
}
for _ in 1..(CidQueue::LEN as u64 - 1) {
q.next().unwrap();
}
for i in CidQueue::LEN as u64..(CidQueue::LEN as u64 + 3) {
q.insert(cid(i)).unwrap();
q.insert(cid(i, 0)).unwrap();
}
for i in (CidQueue::LEN as u64 - 1)..(CidQueue::LEN as u64 + 3) {
q.next().unwrap();
Expand All @@ -184,55 +203,77 @@ mod tests {
}

#[test]
fn retire() {
fn retire_dense() {
let mut q = CidQueue::new(initial_cid());

for i in 1..CidQueue::LEN as u64 {
q.insert(cid(i)).unwrap();
q.insert(cid(i, 0)).unwrap();
}
assert_eq!(q.active_seq(), 0);

assert_eq!(q.retire_prior_to(2), 0..2);
let r = q.retire_prior_to(2);
assert_eq!(r.end - r.start, 0);
assert_eq!(q.insert(cid(4, 2)).unwrap().unwrap().0, 0..2);
assert_eq!(q.active_seq(), 2);
assert_eq!(q.insert(cid(4, 2)), Ok(None));

for i in 2..(CidQueue::LEN as u64 - 1) {
let _ = q.next().unwrap();
assert_eq!(q.active_seq(), i + 1);
let retire = q.retire_prior_to(i + 1);
assert_eq!(retire.end - retire.start, 0);
assert!(!q.is_active_retired());
assert_eq!(q.insert(cid(i + 1, i + 1)), Ok(None));
}

assert!(q.next().is_none());
assert!(!q.is_active_retired());
}

#[test]
fn retire_sparse() {
// Retiring CID 0 when CID 1 is not known should retire CID 1 as we move to CID 2
let mut q = CidQueue::new(initial_cid());
q.insert(cid(2, 0)).unwrap();
assert_eq!(q.insert(cid(3, 1)).unwrap().unwrap().0, 0..2,);
assert_eq!(q.active_seq(), 2);
}

#[test]
fn retire_many() {
let mut q = CidQueue::new(initial_cid());
q.insert(cid(2, 0)).unwrap();
assert_eq!(
q.insert(cid(1_000_000, 1_000_000)).unwrap().unwrap().0,
0..CidQueue::LEN as u64,
);
assert_eq!(q.active_seq(), 1_000_000);
}

#[test]
fn insert_limit() {
let mut q = CidQueue::new(initial_cid());
assert_eq!(q.insert(cid(CidQueue::LEN as u64 - 1)), Ok(()));
assert_eq!(q.insert(cid(CidQueue::LEN as u64 - 1, 0)), Ok(None));
assert_eq!(
q.insert(cid(CidQueue::LEN as u64)),
q.insert(cid(CidQueue::LEN as u64, 0)),
Err(InsertError::ExceedsLimit)
);
}

#[test]
fn insert_duplicate() {
let mut q = CidQueue::new(initial_cid());
q.insert(cid(0)).unwrap();
q.insert(cid(0)).unwrap();
q.insert(cid(0, 0)).unwrap();
q.insert(cid(0, 0)).unwrap();
}

#[test]
fn insert_retired() {
let mut q = CidQueue::new(initial_cid());
assert_eq!(q.insert(cid(0)), Ok(()), "reinserting active CID succeeds");
assert_eq!(
q.insert(cid(0, 0)),
Ok(None),
"reinserting active CID succeeds"
);
assert!(q.next().is_none(), "active CID isn't requeued");
q.insert(cid(1)).unwrap();
q.insert(cid(1, 0)).unwrap();
q.next().unwrap();
assert_eq!(
q.insert(cid(0)),
q.insert(cid(0, 0)),
Err(InsertError::Retired),
"previous active CID is already retired"
);
Expand All @@ -242,12 +283,12 @@ mod tests {
fn retire_then_insert_next() {
let mut q = CidQueue::new(initial_cid());
for i in 1..CidQueue::LEN as u64 {
q.insert(cid(i)).unwrap();
q.insert(cid(i, 0)).unwrap();
}
q.next().unwrap();
q.insert(cid(CidQueue::LEN as u64)).unwrap();
q.insert(cid(CidQueue::LEN as u64, 0)).unwrap();
assert_eq!(
q.insert(cid(CidQueue::LEN as u64 + 1)),
q.insert(cid(CidQueue::LEN as u64 + 1, 0)),
Err(InsertError::ExceedsLimit)
);
}
Expand Down