Skip to content

Commit

Permalink
Replace Broadcast helper with tokio::sync::Notify
Browse files Browse the repository at this point in the history
Drops a chunk of complex and under-audited synchronization code in
favor of a similar, better-tested and more efficient primitive from
tokio.
  • Loading branch information
Ralith committed Dec 22, 2021
1 parent 78d5c78 commit 492e6df
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 154 deletions.
2 changes: 1 addition & 1 deletion quinn/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ proto = { package = "quinn-proto", path = "../quinn-proto", version = "0.8", def
rustls = { version = "0.20", default-features = false, features = ["quic"], optional = true }
thiserror = "1.0.21"
tracing = "0.1.10"
tokio = { version = "1.0.1", features = ["rt", "time"] }
tokio = { version = "1.0.1", features = ["rt", "time", "sync"] }
udp = { package = "quinn-udp", path = "../quinn-udp", version = "0.1.0" }
webpki = { version = "0.22", default-features = false, optional = true }

Expand Down
62 changes: 0 additions & 62 deletions quinn/src/broadcast.rs

This file was deleted.

118 changes: 46 additions & 72 deletions quinn/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@ use futures_core::Stream;
use proto::{ConnectionError, ConnectionHandle, ConnectionStats, Dir, StreamEvent, StreamId};
use rustc_hash::FxHashMap;
use thiserror::Error;
use tokio::time::{sleep_until, Instant as TokioInstant, Sleep};
use tokio::{
sync::Notify,
time::{sleep_until, Instant as TokioInstant, Sleep},
};
use tracing::info_span;
use udp::UdpState;

use crate::{
broadcast::{self, Broadcast},
mutex::Mutex,
poll_fn,
recv_stream::RecvStream,
Expand Down Expand Up @@ -325,11 +327,23 @@ impl Connection {
/// consequence, the peer won't be notified that a stream has been opened until the stream is
/// actually used.
pub async fn open_uni(&self) -> Result<SendStream, ConnectionError> {
OpenUni {
conn: self.0.clone(),
state: broadcast::State::default(),
loop {
let opening;
{
let mut conn = self.0.lock("open_uni");
if let Some(ref e) = conn.error {
return Err(e.clone());
}
if let Some(id) = conn.inner.streams().open(Dir::Uni) {
let is_0rtt = conn.inner.side().is_client() && conn.inner.is_handshaking();
drop(conn); // Release lock for clone
return Ok(SendStream::new(self.0.clone(), id, is_0rtt));
}
opening = conn.uni_opening.clone();
opening.notified() // Must be called, but not awaited, while the connection lock is held
}
.await
}
.await
}

/// Initiate a new outgoing bidirectional stream.
Expand All @@ -338,11 +352,26 @@ impl Connection {
/// consequence, the peer won't be notified that a stream has been opened until the stream is
/// actually used.
pub async fn open_bi(&self) -> Result<(SendStream, RecvStream), ConnectionError> {
OpenBi {
conn: self.0.clone(),
state: broadcast::State::default(),
loop {
let opening;
{
let mut conn = self.0.lock("open_bi");
if let Some(ref e) = conn.error {
return Err(e.clone());
}
if let Some(id) = conn.inner.streams().open(Dir::Bi) {
let is_0rtt = conn.inner.side().is_client() && conn.inner.is_handshaking();
drop(conn); // Release lock for clone
return Ok((
SendStream::new(self.0.clone(), id, is_0rtt),
RecvStream::new(self.0.clone(), id, is_0rtt),
));
}
opening = conn.bi_opening.clone();
opening.notified() // Must be called, but not awaited, while the connection lock is held
}
.await;
}
.await
}

/// Close the connection immediately.
Expand Down Expand Up @@ -622,61 +651,6 @@ impl Stream for Datagrams {
}
}

/// A future that will resolve into an opened outgoing unidirectional stream
#[must_use = "futures/streams/sinks do nothing unless you `.await` or poll them"]
struct OpenUni {
conn: ConnectionRef,
state: broadcast::State,
}

impl Future for OpenUni {
type Output = Result<SendStream, ConnectionError>;

fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let this = self.get_mut();
let mut conn = this.conn.lock("OpenUni::next");
if let Some(ref e) = conn.error {
return Poll::Ready(Err(e.clone()));
}
if let Some(id) = conn.inner.streams().open(Dir::Uni) {
let is_0rtt = conn.inner.side().is_client() && conn.inner.is_handshaking();
drop(conn); // Release lock for clone
return Poll::Ready(Ok(SendStream::new(this.conn.clone(), id, is_0rtt)));
}
conn.uni_opening.register(cx, &mut this.state);
Poll::Pending
}
}

/// A future that will resolve into an opened outgoing bidirectional stream
#[must_use = "futures/streams/sinks do nothing unless you `.await` or poll them"]
struct OpenBi {
conn: ConnectionRef,
state: broadcast::State,
}

impl Future for OpenBi {
type Output = Result<(SendStream, RecvStream), ConnectionError>;

fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let this = self.get_mut();
let mut conn = this.conn.lock("OpenBi::next");
if let Some(ref e) = conn.error {
return Poll::Ready(Err(e.clone()));
}
if let Some(id) = conn.inner.streams().open(Dir::Bi) {
let is_0rtt = conn.inner.side().is_client() && conn.inner.is_handshaking();
drop(conn); // Release lock for clone
return Poll::Ready(Ok((
SendStream::new(this.conn.clone(), id, is_0rtt),
RecvStream::new(this.conn.clone(), id, is_0rtt),
)));
}
conn.bi_opening.register(cx, &mut this.state);
Poll::Pending
}
}

#[derive(Debug)]
pub struct ConnectionRef(Arc<Mutex<ConnectionInner>>);

Expand All @@ -703,8 +677,8 @@ impl ConnectionRef {
endpoint_events,
blocked_writers: FxHashMap::default(),
blocked_readers: FxHashMap::default(),
uni_opening: Broadcast::new(),
bi_opening: Broadcast::new(),
uni_opening: Arc::new(Notify::new()),
bi_opening: Arc::new(Notify::new()),
incoming_uni_streams_reader: None,
incoming_bi_streams_reader: None,
datagram_reader: None,
Expand Down Expand Up @@ -764,8 +738,8 @@ pub struct ConnectionInner {
endpoint_events: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>,
pub(crate) blocked_writers: FxHashMap<StreamId, Waker>,
pub(crate) blocked_readers: FxHashMap<StreamId, Waker>,
uni_opening: Broadcast,
bi_opening: Broadcast,
uni_opening: Arc<Notify>,
bi_opening: Arc<Notify>,
incoming_uni_streams_reader: Option<Waker>,
incoming_bi_streams_reader: Option<Waker>,
datagram_reader: Option<Waker>,
Expand Down Expand Up @@ -892,7 +866,7 @@ impl ConnectionInner {
Dir::Uni => &mut self.uni_opening,
Dir::Bi => &mut self.bi_opening,
};
tasks.wake();
tasks.notify_one();
}
Stream(StreamEvent::Finished { id }) => {
if let Some(finishing) = self.finishing.remove(&id) {
Expand Down Expand Up @@ -984,8 +958,8 @@ impl ConnectionInner {
for (_, reader) in self.blocked_readers.drain() {
reader.wake()
}
self.uni_opening.wake();
self.bi_opening.wake();
self.uni_opening.notify_waiters();
self.bi_opening.notify_waiters();
if let Some(x) = self.incoming_uni_streams_reader.take() {
x.wake();
}
Expand Down
36 changes: 18 additions & 18 deletions quinn/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,12 @@ use proto::{
self as proto, ClientConfig, ConnectError, ConnectionHandle, DatagramEvent, ServerConfig,
};
use rustc_hash::FxHashMap;
use tokio::sync::Notify;
use udp::{RecvMeta, UdpSocket, UdpState, BATCH_SIZE};

use crate::{
broadcast::{self, Broadcast},
connection::Connecting,
poll_fn,
work_limiter::WorkLimiter,
ConnectionEvent, EndpointConfig, EndpointEvent, VarInt, IO_LOOP_BOUND, RECV_TIME_BOUND,
SEND_TIME_BOUND,
connection::Connecting, poll_fn, work_limiter::WorkLimiter, ConnectionEvent, EndpointConfig,
EndpointEvent, VarInt, IO_LOOP_BOUND, RECV_TIME_BOUND, SEND_TIME_BOUND,
};

/// A QUIC endpoint.
Expand Down Expand Up @@ -226,16 +223,19 @@ impl Endpoint {
/// [`close()`]: Endpoint::close
/// [`Incoming`]: crate::Incoming
pub async fn wait_idle(&self) {
let mut state = broadcast::State::default();
poll_fn(move |cx| {
let endpoint = &mut *self.inner.lock().unwrap();
if endpoint.connections.is_empty() {
return Poll::Ready(());
loop {
let idle;
{
let endpoint = &mut *self.inner.lock().unwrap();
if endpoint.connections.is_empty() {
break;
}
idle = endpoint.idle.clone();
// Must be called, but not awaited, while the endpoint lock is held
idle.notified()
}
endpoint.idle.register(cx, &mut state);
Poll::Pending
})
.await;
.await;
}
}
}

Expand Down Expand Up @@ -321,7 +321,7 @@ pub(crate) struct EndpointInner {
recv_limiter: WorkLimiter,
recv_buf: Box<[u8]>,
send_limiter: WorkLimiter,
idle: Broadcast,
idle: Arc<Notify>,
}

impl EndpointInner {
Expand Down Expand Up @@ -442,7 +442,7 @@ impl EndpointInner {
if e.is_drained() {
self.connections.senders.remove(&ch);
if self.connections.is_empty() {
self.idle.wake();
self.idle.notify_waiters();
}
}
if let Some(event) = self.inner.handle_event(ch, e) {
Expand Down Expand Up @@ -581,7 +581,7 @@ impl EndpointRef {
recv_buf: recv_buf.into(),
recv_limiter: WorkLimiter::new(RECV_TIME_BOUND),
send_limiter: WorkLimiter::new(SEND_TIME_BOUND),
idle: Broadcast::new(),
idle: Arc::new(Notify::new()),
})))
}
}
Expand Down
1 change: 0 additions & 1 deletion quinn/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ macro_rules! ready {
};
}

mod broadcast;
mod connection;
mod endpoint;
mod mutex;
Expand Down

0 comments on commit 492e6df

Please sign in to comment.