Skip to content

Commit

Permalink
Bidirectional quic communication support (#29155)
Browse files Browse the repository at this point in the history
* Support bi-directional quic communication, use the same endpoint for the quic server and client
This is needed for supporting using quic for repair

* Added comments on the bi-directional communication tests

* Removed some debug logs

* clippy issue
  • Loading branch information
lijunwangs committed Dec 9, 2022
1 parent 6a90abd commit ecea802
Show file tree
Hide file tree
Showing 7 changed files with 195 additions and 34 deletions.
1 change: 1 addition & 0 deletions client/src/connection_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ impl ConnectionCache {
if self.use_quic() && !force_use_udp {
Some(Arc::new(QuicLazyInitializedEndpoint::new(
self.client_certificate.clone(),
None,
)))
} else {
None
Expand Down
4 changes: 2 additions & 2 deletions core/src/tpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ impl Tpu {
let (verified_sender, verified_receiver) = unbounded();

let stats = Arc::new(StreamStats::default());
let tpu_quic_t = spawn_server(
let (_, tpu_quic_t) = spawn_server(
transactions_quic_sockets,
keypair,
cluster_info.my_contact_info().tpu.ip(),
Expand All @@ -172,7 +172,7 @@ impl Tpu {
)
.unwrap();

let tpu_forwards_quic_t = spawn_server(
let (_, tpu_forwards_quic_t) = spawn_server(
transactions_forwards_quic_sockets,
keypair,
cluster_info.my_contact_info().tpu_forwards.ip(),
Expand Down
1 change: 1 addition & 0 deletions quic-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ impl QuicConfig {
fn create_endpoint(&self) -> Arc<QuicLazyInitializedEndpoint> {
Arc::new(QuicLazyInitializedEndpoint::new(
self.client_certificate.clone(),
None,
))
}

Expand Down
39 changes: 26 additions & 13 deletions quic-client/src/nonblocking/quic_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ pub struct QuicClientCertificate {
pub struct QuicLazyInitializedEndpoint {
endpoint: RwLock<Option<Arc<Endpoint>>>,
client_certificate: Arc<QuicClientCertificate>,
client_endpoint: Option<Endpoint>,
}

#[derive(Error, Debug)]
Expand All @@ -90,19 +91,30 @@ impl From<QuicError> for ClientErrorKind {
}

impl QuicLazyInitializedEndpoint {
pub fn new(client_certificate: Arc<QuicClientCertificate>) -> Self {
pub fn new(
client_certificate: Arc<QuicClientCertificate>,
client_endpoint: Option<Endpoint>,
) -> Self {
Self {
endpoint: RwLock::new(None),
client_certificate,
client_endpoint,
}
}

fn create_endpoint(&self) -> Endpoint {
let (_, client_socket) = solana_net_utils::bind_in_range(
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
VALIDATOR_PORT_RANGE,
)
.expect("QuicLazyInitializedEndpoint::create_endpoint bind_in_range");
let mut endpoint = if let Some(endpoint) = &self.client_endpoint {
endpoint.clone()
} else {
let client_socket = solana_net_utils::bind_in_range(
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
VALIDATOR_PORT_RANGE,
)
.expect("QuicLazyInitializedEndpoint::create_endpoint bind_in_range")
.1;

QuicNewConnection::create_endpoint(EndpointConfig::default(), client_socket)
};

let mut crypto = rustls::ClientConfig::builder()
.with_safe_defaults()
Expand All @@ -115,9 +127,6 @@ impl QuicLazyInitializedEndpoint {
crypto.enable_early_data = true;
crypto.alpn_protocols = vec![ALPN_TPU_PROTOCOL_ID.to_vec()];

let mut endpoint =
QuicNewConnection::create_endpoint(EndpointConfig::default(), client_socket);

let mut config = ClientConfig::new(Arc::new(crypto));
let transport_config = Arc::get_mut(&mut config.transport)
.expect("QuicLazyInitializedEndpoint::create_endpoint Arc::get_mut");
Expand All @@ -126,6 +135,7 @@ impl QuicLazyInitializedEndpoint {
transport_config.keep_alive_interval(Some(Duration::from_millis(QUIC_KEEP_ALIVE_MS)));

endpoint.set_default_client_config(config);

endpoint
}

Expand Down Expand Up @@ -160,10 +170,13 @@ impl Default for QuicLazyInitializedEndpoint {
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
)
.expect("Failed to create QUIC client certificate");
Self::new(Arc::new(QuicClientCertificate {
certificates: certs,
key: priv_key,
}))
Self::new(
Arc::new(QuicClientCertificate {
certificates: certs,
key: priv_key,
}),
None,
)
}
}

Expand Down
152 changes: 147 additions & 5 deletions quic-client/tests/quic_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,19 @@
mod tests {
use {
crossbeam_channel::{unbounded, Receiver},
log::*,
solana_perf::packet::PacketBatch,
solana_quic_client::nonblocking::quic_client::QuicLazyInitializedEndpoint,
solana_quic_client::nonblocking::quic_client::{
QuicClientCertificate, QuicLazyInitializedEndpoint,
},
solana_sdk::{packet::PACKET_DATA_SIZE, signature::Keypair},
solana_streamer::{quic::StreamStats, streamer::StakedNodes},
solana_streamer::{
quic::StreamStats, streamer::StakedNodes,
tls_certificates::new_self_signed_tls_certificate_chain,
},
solana_tpu_client::connection_cache_stats::ConnectionCacheStats,
std::{
net::{IpAddr, SocketAddr, UdpSocket},
net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket},
sync::{
atomic::{AtomicBool, Ordering},
Arc, RwLock,
Expand Down Expand Up @@ -68,7 +74,7 @@ mod tests {
let (sender, receiver) = unbounded();
let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
let (s, exit, keypair, ip, stats) = server_args();
let t = solana_streamer::quic::spawn_server(
let (_, t) = solana_streamer::quic::spawn_server(
s.try_clone().unwrap(),
&keypair,
ip,
Expand Down Expand Up @@ -115,7 +121,7 @@ mod tests {
let (sender, receiver) = unbounded();
let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
let (s, exit, keypair, ip, stats) = server_args();
let t = solana_streamer::nonblocking::quic::spawn_server(
let (_, t) = solana_streamer::nonblocking::quic::spawn_server(
s.try_clone().unwrap(),
&keypair,
ip,
Expand Down Expand Up @@ -151,4 +157,140 @@ mod tests {
exit.store(true, Ordering::Relaxed);
t.await.unwrap();
}

#[test]
fn test_quic_bi_direction() {
/// This tests bi-directional quic communication. There are the following components
/// The request receiver -- responsible for receiving requests
/// The request sender -- responsible sending requests to the request reciever using quic
/// The response receiver -- responsible for receiving the responses to the requests
/// The response sender -- responsible for sending responses to the response receiver.
/// In this we demonstrate that the request sender and the response receiver use the
/// same quic Endpoint, and the same UDP socket.
use {
solana_quic_client::quic_client::QuicTpuConnection,
solana_tpu_client::tpu_connection::TpuConnection,
};
solana_logger::setup();

// Request Receiver
let (sender, receiver) = unbounded();
let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
let (request_recv_socket, request_recv_exit, keypair, request_recv_ip, request_recv_stats) =
server_args();
let (request_recv_endpoint, request_recv_thread) = solana_streamer::quic::spawn_server(
request_recv_socket.try_clone().unwrap(),
&keypair,
request_recv_ip,
sender,
request_recv_exit.clone(),
1,
staked_nodes.clone(),
10,
10,
request_recv_stats,
1000,
)
.unwrap();

drop(request_recv_endpoint);
// Response Receiver:
let (
response_recv_socket,
response_recv_exit,
keypair2,
response_recv_ip,
response_recv_stats,
) = server_args();
let (sender2, receiver2) = unbounded();

let addr = response_recv_socket.local_addr().unwrap().ip();
let port = response_recv_socket.local_addr().unwrap().port();
let server_addr = SocketAddr::new(addr, port);
let (response_recv_endpoint, response_recv_thread) = solana_streamer::quic::spawn_server(
response_recv_socket,
&keypair2,
response_recv_ip,
sender2,
response_recv_exit.clone(),
1,
staked_nodes,
10,
10,
response_recv_stats,
1000,
)
.unwrap();

// Request Sender, it uses the same endpoint as the response receiver:
let addr = request_recv_socket.local_addr().unwrap().ip();
let port = request_recv_socket.local_addr().unwrap().port();
let tpu_addr = SocketAddr::new(addr, port);
let connection_cache_stats = Arc::new(ConnectionCacheStats::default());

let (certs, priv_key) = new_self_signed_tls_certificate_chain(
&Keypair::new(),
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
)
.expect("Failed to initialize QUIC client certificates");
let client_certificate = Arc::new(QuicClientCertificate {
certificates: certs,
key: priv_key,
});

let endpoint =
QuicLazyInitializedEndpoint::new(client_certificate, Some(response_recv_endpoint));
let request_sender =
QuicTpuConnection::new(Arc::new(endpoint), tpu_addr, connection_cache_stats);
// Send a full size packet with single byte writes as a request.
let num_bytes = PACKET_DATA_SIZE;
let num_expected_packets: usize = 3000;
let packets = vec![vec![0u8; PACKET_DATA_SIZE]; num_expected_packets];

assert!(request_sender
.send_wire_transaction_batch_async(packets)
.is_ok());
check_packets(receiver, num_bytes, num_expected_packets);
info!("Received requests!");

// Response sender
let (certs, priv_key) = new_self_signed_tls_certificate_chain(
&Keypair::new(),
IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
)
.expect("Failed to initialize QUIC client certificates");

let client_certificate2 = Arc::new(QuicClientCertificate {
certificates: certs,
key: priv_key,
});

let endpoint2 = QuicLazyInitializedEndpoint::new(client_certificate2, None);
let connection_cache_stats2 = Arc::new(ConnectionCacheStats::default());
let response_sender =
QuicTpuConnection::new(Arc::new(endpoint2), server_addr, connection_cache_stats2);

// Send a full size packet with single byte writes.
let num_bytes = PACKET_DATA_SIZE;
let num_expected_packets: usize = 3000;
let packets = vec![vec![0u8; PACKET_DATA_SIZE]; num_expected_packets];

assert!(response_sender
.send_wire_transaction_batch_async(packets)
.is_ok());
check_packets(receiver2, num_bytes, num_expected_packets);
info!("Received responses!");

// Drop the clients explicitly to avoid hung on drops
drop(request_sender);
drop(response_sender);

request_recv_exit.store(true, Ordering::Relaxed);
request_recv_thread.join().unwrap();
info!("Request receiver exited!");

response_recv_exit.store(true, Ordering::Relaxed);
response_recv_thread.join().unwrap();
info!("Response receiver exited!");
}
}
16 changes: 10 additions & 6 deletions streamer/src/nonblocking/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,11 @@ pub fn spawn_server(
max_unstaked_connections: usize,
stats: Arc<StreamStats>,
wait_for_chunk_timeout_ms: u64,
) -> Result<JoinHandle<()>, QuicServerError> {
) -> Result<(Endpoint, JoinHandle<()>), QuicServerError> {
info!("Start quic server on {:?}", sock);
let (config, _cert) = configure_server(keypair, gossip_host)?;

let (_, incoming) = {
let (endpoint, incoming) = {
Endpoint::new(EndpointConfig::default(), Some(config), sock)
.map_err(|_e| QuicServerError::EndpointFailed)?
};
Expand All @@ -90,7 +91,7 @@ pub fn spawn_server(
stats,
wait_for_chunk_timeout_ms,
));
Ok(handle)
Ok((endpoint, handle))
}

pub async fn run_server(
Expand Down Expand Up @@ -126,6 +127,7 @@ pub async fn run_server(
}

if let Ok(Some(connection)) = timeout_connection {
info!("Got a connection {:?}", connection.remote_address());
tokio::spawn(setup_connection(
connection,
unstaked_connection_table.clone(),
Expand All @@ -139,6 +141,8 @@ pub async fn run_server(
wait_for_chunk_timeout_ms,
));
sleep(Duration::from_micros(WAIT_BETWEEN_NEW_CONNECTIONS_US)).await;
} else {
info!("Timed out waiting for connection");
}
}
}
Expand Down Expand Up @@ -1005,7 +1009,7 @@ pub mod test {
let server_address = s.local_addr().unwrap();
let staked_nodes = Arc::new(RwLock::new(option_staked_nodes.unwrap_or_default()));
let stats = Arc::new(StreamStats::default());
let t = spawn_server(
let (_, t) = spawn_server(
s,
&keypair,
ip,
Expand Down Expand Up @@ -1317,7 +1321,7 @@ pub mod test {
let server_address = s.local_addr().unwrap();
let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
let stats = Arc::new(StreamStats::default());
let t = spawn_server(
let (_, t) = spawn_server(
s,
&keypair,
ip,
Expand Down Expand Up @@ -1348,7 +1352,7 @@ pub mod test {
let server_address = s.local_addr().unwrap();
let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
let stats = Arc::new(StreamStats::default());
let t = spawn_server(
let (_, t) = spawn_server(
s,
&keypair,
ip,
Expand Down

0 comments on commit ecea802

Please sign in to comment.