Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Identify connections by four-tuple #1306

Merged
merged 1 commit into from
Feb 18, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
107 changes: 64 additions & 43 deletions quinn-proto/src/endpoint.rs
Expand Up @@ -51,7 +51,7 @@ pub struct Endpoint {
/// Identifies connections with zero-length CIDs
///
/// Uses a standard `HashMap` to protect against hash collision attacks.
connection_remotes: HashMap<SocketAddr, ConnectionHandle>,
connection_remotes: HashMap<FourTuple, ConnectionHandle>,
/// Reset tokens provided by the peer for the CID each connection is currently sending to
///
/// Incoming stateless resets do not have correct CIDs, so we need this to identify the correct
Expand Down Expand Up @@ -131,7 +131,7 @@ impl Endpoint {
for cid in conn.loc_cids.values() {
self.connection_ids.remove(cid);
}
self.connection_remotes.remove(&conn.initial_remote);
self.connection_remotes.remove(&conn.addresses);
if let Some((remote, token)) = conn.reset_token {
self.connection_reset_tokens.remove(remote, token);
}
Expand Down Expand Up @@ -203,6 +203,7 @@ impl Endpoint {
// Handle packet on existing connection, if any
//

let addresses = FourTuple { remote, local_ip };
let dst_cid = first_decode.dst_cid();
let known_ch = {
let ch = if self.local_cid_generator.cid_len() > 0 {
Expand All @@ -219,7 +220,7 @@ impl Endpoint {
})
.or_else(|| {
if self.local_cid_generator.cid_len() == 0 {
self.connection_remotes.get(&remote)
self.connection_remotes.get(&addresses)
} else {
None
}
Expand All @@ -230,7 +231,7 @@ impl Endpoint {
return None;
}
self.connection_reset_tokens
.get(remote, &data[data.len() - RESET_TOKEN_SIZE..])
.get(addresses.remote, &data[data.len() - RESET_TOKEN_SIZE..])
})
.cloned()
};
Expand All @@ -239,7 +240,7 @@ impl Endpoint {
ch,
DatagramEvent::ConnectionEvent(ConnectionEvent(ConnectionEventInner::Datagram {
now,
remote,
remote: addresses.remote,
ecn,
first_decode,
remaining,
Expand All @@ -255,7 +256,7 @@ impl Endpoint {
Some(config) => config,
None => {
debug!("packet for unrecognized connection {}", dst_cid);
self.stateless_reset(datagram_len, remote, local_ip, &dst_cid);
self.stateless_reset(datagram_len, addresses, &dst_cid);
return None;
}
};
Expand Down Expand Up @@ -283,7 +284,7 @@ impl Endpoint {
};
return match first_decode.finish(Some(&*crypto.header.remote)) {
Ok(packet) => self
.handle_first_packet(now, remote, local_ip, ecn, packet, remaining, &crypto)
.handle_first_packet(now, addresses, ecn, packet, remaining, &crypto)
.map(|(ch, conn)| (ch, DatagramEvent::NewConnection(conn))),
Err(e) => {
trace!("unable to decode initial packet: {}", e);
Expand All @@ -304,7 +305,7 @@ impl Endpoint {
//

if !dst_cid.is_empty() {
self.stateless_reset(datagram_len, remote, local_ip, &dst_cid);
self.stateless_reset(datagram_len, addresses, &dst_cid);
} else {
trace!("dropping unrecognized short packet without ID");
}
Expand All @@ -314,8 +315,7 @@ impl Endpoint {
fn stateless_reset(
&mut self,
inciting_dgram_len: usize,
remote: SocketAddr,
local_ip: Option<IpAddr>,
addresses: FourTuple,
dst_cid: &ConnectionId,
) {
/// Minimum amount of padding for the stateless reset to look like a short-header packet
Expand All @@ -331,7 +331,10 @@ impl Endpoint {
}
};

debug!("sending stateless reset for {} to {}", dst_cid, remote);
debug!(
"sending stateless reset for {} to {}",
dst_cid, addresses.remote
);
let mut buf = Vec::<u8>::new();
// Resets with at least this much padding can't possibly be distinguished from real packets
const IDEAL_MIN_PADDING_LEN: usize = MIN_PADDING_LEN + MAX_CID_SIZE;
Expand All @@ -349,11 +352,11 @@ impl Endpoint {
debug_assert!(buf.len() < inciting_dgram_len);

self.transmits.push_back(Transmit {
destination: remote,
destination: addresses.remote,
ecn: None,
contents: buf,
segment_size: None,
src_ip: local_ip,
src_ip: addresses.local_ip,
});
}

Expand Down Expand Up @@ -391,8 +394,10 @@ impl Endpoint {
remote_id,
loc_cid,
remote_id,
remote,
None,
FourTuple {
remote,
local_ip: None,
},
Instant::now(),
tls,
None,
Expand Down Expand Up @@ -437,8 +442,7 @@ impl Endpoint {
fn handle_first_packet(
&mut self,
now: Instant,
remote: SocketAddr,
local_ip: Option<IpAddr>,
addresses: FourTuple,
ecn: Option<EcnCodepoint>,
mut packet: Packet,
rest: Option<BytesMut>,
Expand Down Expand Up @@ -484,8 +488,7 @@ impl Endpoint {
debug!("refusing connection");
self.initial_close(
version,
remote,
local_ip,
addresses,
crypto,
&src_cid,
&loc_cid,
Expand All @@ -503,8 +506,7 @@ impl Endpoint {
);
self.initial_close(
version,
remote,
local_ip,
addresses,
crypto,
&src_cid,
&loc_cid,
Expand All @@ -524,7 +526,7 @@ impl Endpoint {
issued: SystemTime::now(),
random_bytes: &random_bytes,
}
.encode(&*server_config.token_key, &remote, &loc_cid);
.encode(&*server_config.token_key, &addresses.remote, &loc_cid);

let header = Header::Retry {
src_cid: loc_cid,
Expand All @@ -539,16 +541,21 @@ impl Endpoint {
encode.finish(&mut buf, &*crypto.header.local, None);

self.transmits.push_back(Transmit {
destination: remote,
destination: addresses.remote,
ecn: None,
contents: buf,
segment_size: None,
src_ip: local_ip,
src_ip: addresses.local_ip,
});
return None;
}

match RetryToken::from_bytes(&*server_config.token_key, &remote, &dst_cid, &token) {
match RetryToken::from_bytes(
&*server_config.token_key,
&addresses.remote,
&dst_cid,
&token,
) {
Ok(token)
if token.issued + server_config.retry_token_lifetime > SystemTime::now() =>
{
Expand All @@ -558,8 +565,7 @@ impl Endpoint {
debug!("rejecting invalid stateless retry token");
self.initial_close(
version,
remote,
local_ip,
addresses,
crypto,
&src_cid,
&loc_cid,
Expand Down Expand Up @@ -591,8 +597,7 @@ impl Endpoint {
dst_cid,
loc_cid,
src_cid,
remote,
local_ip,
addresses,
now,
tls,
Some(server_config),
Expand All @@ -601,7 +606,14 @@ impl Endpoint {
if dst_cid.len() != 0 {
self.connection_ids_initial.insert(dst_cid, ch);
}
match conn.handle_first_packet(now, remote, ecn, packet_number as u64, packet, rest) {
match conn.handle_first_packet(
now,
addresses.remote,
ecn,
packet_number as u64,
packet,
rest,
) {
Ok(()) => {
trace!(id = ch.0, icid = %dst_cid, "connection incoming");
Some((ch, conn))
Expand All @@ -610,7 +622,7 @@ impl Endpoint {
debug!("handshake failed: {}", e);
self.handle_event(ch, EndpointEvent(EndpointEventInner::Drained));
if let ConnectionError::TransportError(e) = e {
self.initial_close(version, remote, local_ip, crypto, &src_cid, &loc_cid, e);
self.initial_close(version, addresses, crypto, &src_cid, &loc_cid, e);
}
None
}
Expand All @@ -623,8 +635,7 @@ impl Endpoint {
init_cid: ConnectionId,
loc_cid: ConnectionId,
rem_cid: ConnectionId,
remote: SocketAddr,
local_ip: Option<IpAddr>,
addresses: FourTuple,
now: Instant,
tls: Box<dyn crypto::Session>,
server_config: Option<Arc<ServerConfig>>,
Expand All @@ -637,8 +648,8 @@ impl Endpoint {
init_cid,
loc_cid,
rem_cid,
remote,
local_ip,
addresses.remote,
addresses.local_ip,
tls,
self.local_cid_generator.as_ref(),
now,
Expand All @@ -649,13 +660,13 @@ impl Endpoint {
init_cid,
cids_issued: 0,
loc_cids: iter::once((0, loc_cid)).collect(),
initial_remote: remote,
addresses,
reset_token: None,
});

let ch = ConnectionHandle(id);
match self.local_cid_generator.cid_len() {
0 => self.connection_remotes.insert(remote, ch),
0 => self.connection_remotes.insert(addresses, ch),
_ => self.connection_ids.insert(loc_cid, ch),
};

Expand All @@ -665,8 +676,7 @@ impl Endpoint {
fn initial_close(
&mut self,
version: u32,
destination: SocketAddr,
local_ip: Option<IpAddr>,
addresses: FourTuple,
crypto: &Keys,
remote_id: &ConnectionId,
local_id: &ConnectionId,
Expand Down Expand Up @@ -694,11 +704,11 @@ impl Endpoint {
Some((0, &*crypto.packet.local)),
);
self.transmits.push_back(Transmit {
destination,
destination: addresses.remote,
ecn: None,
contents: buf,
segment_size: None,
src_ip: local_ip,
src_ip: addresses.local_ip,
})
}

Expand Down Expand Up @@ -765,11 +775,11 @@ pub(crate) struct ConnectionMeta {
/// Number of local connection IDs that have been issued in NEW_CONNECTION_ID frames.
cids_issued: u64,
loc_cids: FxHashMap<u64, ConnectionId>,
/// Remote address the connection began with
/// Remote/local addresses the connection began with
///
/// Only needed to support connections with zero-length CIDs, which cannot migrate, so we don't
/// bother keeping it up to date.
initial_remote: SocketAddr,
addresses: FourTuple,
/// Reset token provided by the peer for the CID we're currently sending to, and the address
/// being sent to
reset_token: Option<(SocketAddr, ResetToken)>,
Expand Down Expand Up @@ -874,3 +884,14 @@ impl ResetTokenTable {
self.0.get(&remote)?.get(&token)
}
}

/// Identifies a connection by the combination of remote and local addresses
///
/// Including the local ensures good behavior when the host has multiple IP addresses on the same
/// subnet and zero-length connection IDs are in use.
#[derive(Hash, Eq, PartialEq, Debug, Copy, Clone)]
struct FourTuple {
remote: SocketAddr,
// A single socket can only listen on a single port, so no need to store it explicitly
local_ip: Option<IpAddr>,
}