Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace Broadcast helper with tokio::sync::Notify #1264

Merged
merged 4 commits into from Dec 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
62 changes: 0 additions & 62 deletions quinn/src/broadcast.rs

This file was deleted.

114 changes: 38 additions & 76 deletions quinn/src/connection.rs
Expand Up @@ -16,12 +16,12 @@ use futures_core::Stream;
use proto::{ConnectionError, ConnectionHandle, ConnectionStats, Dir, StreamEvent, StreamId};
use rustc_hash::FxHashMap;
use thiserror::Error;
use tokio::sync::Notify;
use tokio::time::{sleep_until, Instant as TokioInstant, Sleep};
use tracing::info_span;
use udp::UdpState;

use crate::{
broadcast::{self, Broadcast},
mutex::Mutex,
poll_fn,
recv_stream::RecvStream,
Expand Down Expand Up @@ -324,22 +324,45 @@ impl Connection {
/// Streams are cheap and instantaneous to open unless blocked by flow control. As a
/// consequence, the peer won't be notified that a stream has been opened until the stream is
/// actually used.
pub fn open_uni(&self) -> OpenUni {
OpenUni {
conn: self.0.clone(),
state: broadcast::State::default(),
}
pub async fn open_uni(&self) -> Result<SendStream, ConnectionError> {
Ralith marked this conversation as resolved.
Show resolved Hide resolved
let (id, is_0rtt) = self.open(Dir::Uni).await?;
Ok(SendStream::new(self.0.clone(), id, is_0rtt))
}

/// Initiate a new outgoing bidirectional stream.
///
/// Streams are cheap and instantaneous to open unless blocked by flow control. As a
/// consequence, the peer won't be notified that a stream has been opened until the stream is
/// actually used.
pub fn open_bi(&self) -> OpenBi {
OpenBi {
conn: self.0.clone(),
state: broadcast::State::default(),
pub async fn open_bi(&self) -> Result<(SendStream, RecvStream), ConnectionError> {
let (id, is_0rtt) = self.open(Dir::Bi).await?;
Ok((
SendStream::new(self.0.clone(), id, is_0rtt),
RecvStream::new(self.0.clone(), id, is_0rtt),
))
}

async fn open(&self, dir: Dir) -> Result<(StreamId, bool), ConnectionError> {
loop {
let opening;
{
let mut conn = self.0.lock("open");
if let Some(ref e) = conn.error {
return Err(e.clone());
}
if let Some(id) = conn.inner.streams().open(dir) {
let is_0rtt = conn.inner.side().is_client() && conn.inner.is_handshaking();
return Ok((id, is_0rtt));
}
// Clone the `Arc<Notify>` so we can wait on the underlying `Notify` without holding
// the lock. Store it in the outer scope to ensure it outlives the lock guard.
opening = conn.stream_opening[dir as usize].clone();
// Construct the future while the lock is held to ensure we can't miss a wakeup if
// the `Notify` is signaled immediately after we release the lock. `await` it after
// the lock guard is out of scope.
opening.notified()
}
.await
}
}

Expand Down Expand Up @@ -620,61 +643,6 @@ impl Stream for Datagrams {
}
}

/// A future that will resolve into an opened outgoing unidirectional stream
#[must_use = "futures/streams/sinks do nothing unless you `.await` or poll them"]
pub struct OpenUni {
conn: ConnectionRef,
state: broadcast::State,
}

impl Future for OpenUni {
type Output = Result<SendStream, ConnectionError>;

fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let this = self.get_mut();
let mut conn = this.conn.lock("OpenUni::next");
if let Some(ref e) = conn.error {
return Poll::Ready(Err(e.clone()));
}
if let Some(id) = conn.inner.streams().open(Dir::Uni) {
let is_0rtt = conn.inner.side().is_client() && conn.inner.is_handshaking();
drop(conn); // Release lock for clone
return Poll::Ready(Ok(SendStream::new(this.conn.clone(), id, is_0rtt)));
}
conn.uni_opening.register(cx, &mut this.state);
Poll::Pending
}
}

/// A future that will resolve into an opened outgoing bidirectional stream
#[must_use = "futures/streams/sinks do nothing unless you `.await` or poll them"]
pub struct OpenBi {
conn: ConnectionRef,
state: broadcast::State,
}

impl Future for OpenBi {
type Output = Result<(SendStream, RecvStream), ConnectionError>;

fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let this = self.get_mut();
let mut conn = this.conn.lock("OpenBi::next");
if let Some(ref e) = conn.error {
return Poll::Ready(Err(e.clone()));
}
if let Some(id) = conn.inner.streams().open(Dir::Bi) {
let is_0rtt = conn.inner.side().is_client() && conn.inner.is_handshaking();
drop(conn); // Release lock for clone
return Poll::Ready(Ok((
SendStream::new(this.conn.clone(), id, is_0rtt),
RecvStream::new(this.conn.clone(), id, is_0rtt),
)));
}
conn.bi_opening.register(cx, &mut this.state);
Poll::Pending
}
}

#[derive(Debug)]
pub struct ConnectionRef(Arc<Mutex<ConnectionInner>>);

Expand All @@ -701,8 +669,7 @@ impl ConnectionRef {
endpoint_events,
blocked_writers: FxHashMap::default(),
blocked_readers: FxHashMap::default(),
uni_opening: Broadcast::new(),
bi_opening: Broadcast::new(),
stream_opening: [Arc::new(Notify::new()), Arc::new(Notify::new())],
incoming_uni_streams_reader: None,
incoming_bi_streams_reader: None,
datagram_reader: None,
Expand Down Expand Up @@ -762,8 +729,7 @@ pub struct ConnectionInner {
endpoint_events: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>,
pub(crate) blocked_writers: FxHashMap<StreamId, Waker>,
pub(crate) blocked_readers: FxHashMap<StreamId, Waker>,
uni_opening: Broadcast,
bi_opening: Broadcast,
stream_opening: [Arc<Notify>; 2],
incoming_uni_streams_reader: Option<Waker>,
incoming_bi_streams_reader: Option<Waker>,
datagram_reader: Option<Waker>,
Expand Down Expand Up @@ -886,11 +852,7 @@ impl ConnectionInner {
}
}
Stream(StreamEvent::Available { dir }) => {
let tasks = match dir {
Dir::Uni => &mut self.uni_opening,
Dir::Bi => &mut self.bi_opening,
};
tasks.wake();
self.stream_opening[dir as usize].notify_one();
}
Stream(StreamEvent::Finished { id }) => {
if let Some(finishing) = self.finishing.remove(&id) {
Expand Down Expand Up @@ -982,8 +944,8 @@ impl ConnectionInner {
for (_, reader) in self.blocked_readers.drain() {
reader.wake()
}
self.uni_opening.wake();
self.bi_opening.wake();
self.stream_opening[Dir::Uni as usize].notify_waiters();
self.stream_opening[Dir::Bi as usize].notify_waiters();
if let Some(x) = self.incoming_uni_streams_reader.take() {
x.wake();
}
Expand Down
40 changes: 22 additions & 18 deletions quinn/src/endpoint.rs
Expand Up @@ -19,15 +19,12 @@ use proto::{
self as proto, ClientConfig, ConnectError, ConnectionHandle, DatagramEvent, ServerConfig,
};
use rustc_hash::FxHashMap;
use tokio::sync::Notify;
use udp::{RecvMeta, UdpSocket, UdpState, BATCH_SIZE};

use crate::{
broadcast::{self, Broadcast},
connection::Connecting,
poll_fn,
work_limiter::WorkLimiter,
ConnectionEvent, EndpointConfig, EndpointEvent, VarInt, IO_LOOP_BOUND, RECV_TIME_BOUND,
SEND_TIME_BOUND,
connection::Connecting, poll_fn, work_limiter::WorkLimiter, ConnectionEvent, EndpointConfig,
EndpointEvent, VarInt, IO_LOOP_BOUND, RECV_TIME_BOUND, SEND_TIME_BOUND,
};

/// A QUIC endpoint.
Expand Down Expand Up @@ -226,16 +223,23 @@ impl Endpoint {
/// [`close()`]: Endpoint::close
/// [`Incoming`]: crate::Incoming
pub async fn wait_idle(&self) {
let mut state = broadcast::State::default();
poll_fn(move |cx| {
let endpoint = &mut *self.inner.lock().unwrap();
if endpoint.connections.is_empty() {
return Poll::Ready(());
loop {
let idle;
{
let endpoint = &mut *self.inner.lock().unwrap();
if endpoint.connections.is_empty() {
break;
}
// Clone the `Arc<Notify>` so we can wait on the underlying `Notify` without holding
// the lock. Store it in the outer scope to ensure it outlives the lock guard.
idle = endpoint.idle.clone();
// Construct the future while the lock is held to ensure we can't miss a wakeup if
// the `Notify` is signaled immediately after we release the lock. `await` it after
// the lock guard is out of scope.
idle.notified()
}
endpoint.idle.register(cx, &mut state);
Poll::Pending
})
.await;
.await;
}
}
}

Expand Down Expand Up @@ -321,7 +325,7 @@ pub(crate) struct EndpointInner {
recv_limiter: WorkLimiter,
recv_buf: Box<[u8]>,
send_limiter: WorkLimiter,
idle: Broadcast,
idle: Arc<Notify>,
}

impl EndpointInner {
Expand Down Expand Up @@ -442,7 +446,7 @@ impl EndpointInner {
if e.is_drained() {
self.connections.senders.remove(&ch);
if self.connections.is_empty() {
self.idle.wake();
self.idle.notify_waiters();
}
}
if let Some(event) = self.inner.handle_event(ch, e) {
Expand Down Expand Up @@ -581,7 +585,7 @@ impl EndpointRef {
recv_buf: recv_buf.into(),
recv_limiter: WorkLimiter::new(RECV_TIME_BOUND),
send_limiter: WorkLimiter::new(SEND_TIME_BOUND),
idle: Broadcast::new(),
idle: Arc::new(Notify::new()),
})))
}
}
Expand Down
3 changes: 1 addition & 2 deletions quinn/src/lib.rs
Expand Up @@ -55,7 +55,6 @@ macro_rules! ready {
};
}

mod broadcast;
mod connection;
mod endpoint;
mod mutex;
Expand All @@ -71,7 +70,7 @@ pub use proto::{

pub use crate::connection::{
Connecting, Connection, Datagrams, IncomingBiStreams, IncomingUniStreams, NewConnection,
OpenBi, OpenUni, SendDatagramError, UnknownStream, ZeroRttAccepted,
SendDatagramError, UnknownStream, ZeroRttAccepted,
};
pub use crate::endpoint::{Endpoint, Incoming};
pub use crate::recv_stream::{ReadError, ReadExactError, ReadToEndError, RecvStream};
Expand Down