Skip to content

Commit

Permalink
Factor out common stream open logic
Browse files Browse the repository at this point in the history
  • Loading branch information
Ralith committed Dec 22, 2021
1 parent 492e6df commit bb8fe6e
Showing 1 changed file with 19 additions and 36 deletions.
55 changes: 19 additions & 36 deletions quinn/src/connection.rs
Expand Up @@ -327,23 +327,8 @@ impl Connection {
/// consequence, the peer won't be notified that a stream has been opened until the stream is
/// actually used.
pub async fn open_uni(&self) -> Result<SendStream, ConnectionError> {
loop {
let opening;
{
let mut conn = self.0.lock("open_uni");
if let Some(ref e) = conn.error {
return Err(e.clone());
}
if let Some(id) = conn.inner.streams().open(Dir::Uni) {
let is_0rtt = conn.inner.side().is_client() && conn.inner.is_handshaking();
drop(conn); // Release lock for clone
return Ok(SendStream::new(self.0.clone(), id, is_0rtt));
}
opening = conn.uni_opening.clone();
opening.notified() // Must be called, but not awaited, while the connection lock is held
}
.await
}
let (id, is_0rtt) = self.open(Dir::Uni).await?;
Ok(SendStream::new(self.0.clone(), id, is_0rtt))
}

/// Initiate a new outgoing bidirectional stream.
Expand All @@ -352,22 +337,26 @@ impl Connection {
/// consequence, the peer won't be notified that a stream has been opened until the stream is
/// actually used.
pub async fn open_bi(&self) -> Result<(SendStream, RecvStream), ConnectionError> {
let (id, is_0rtt) = self.open(Dir::Bi).await?;
Ok((
SendStream::new(self.0.clone(), id, is_0rtt),
RecvStream::new(self.0.clone(), id, is_0rtt),
))
}

async fn open(&self, dir: Dir) -> Result<(StreamId, bool), ConnectionError> {
loop {
let opening;
{
let mut conn = self.0.lock("open_bi");
let mut conn = self.0.lock("open");
if let Some(ref e) = conn.error {
return Err(e.clone());
}
if let Some(id) = conn.inner.streams().open(Dir::Bi) {
if let Some(id) = conn.inner.streams().open(dir) {
let is_0rtt = conn.inner.side().is_client() && conn.inner.is_handshaking();
drop(conn); // Release lock for clone
return Ok((
SendStream::new(self.0.clone(), id, is_0rtt),
RecvStream::new(self.0.clone(), id, is_0rtt),
));
return Ok((id, is_0rtt));
}
opening = conn.bi_opening.clone();
opening = conn.stream_opening[dir as usize].clone();
opening.notified() // Must be called, but not awaited, while the connection lock is held
}
.await;
Expand Down Expand Up @@ -677,8 +666,7 @@ impl ConnectionRef {
endpoint_events,
blocked_writers: FxHashMap::default(),
blocked_readers: FxHashMap::default(),
uni_opening: Arc::new(Notify::new()),
bi_opening: Arc::new(Notify::new()),
stream_opening: [Arc::new(Notify::new()), Arc::new(Notify::new())],
incoming_uni_streams_reader: None,
incoming_bi_streams_reader: None,
datagram_reader: None,
Expand Down Expand Up @@ -738,8 +726,7 @@ pub struct ConnectionInner {
endpoint_events: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>,
pub(crate) blocked_writers: FxHashMap<StreamId, Waker>,
pub(crate) blocked_readers: FxHashMap<StreamId, Waker>,
uni_opening: Arc<Notify>,
bi_opening: Arc<Notify>,
stream_opening: [Arc<Notify>; 2],
incoming_uni_streams_reader: Option<Waker>,
incoming_bi_streams_reader: Option<Waker>,
datagram_reader: Option<Waker>,
Expand Down Expand Up @@ -862,11 +849,7 @@ impl ConnectionInner {
}
}
Stream(StreamEvent::Available { dir }) => {
let tasks = match dir {
Dir::Uni => &mut self.uni_opening,
Dir::Bi => &mut self.bi_opening,
};
tasks.notify_one();
self.stream_opening[dir as usize].notify_one();
}
Stream(StreamEvent::Finished { id }) => {
if let Some(finishing) = self.finishing.remove(&id) {
Expand Down Expand Up @@ -958,8 +941,8 @@ impl ConnectionInner {
for (_, reader) in self.blocked_readers.drain() {
reader.wake()
}
self.uni_opening.notify_waiters();
self.bi_opening.notify_waiters();
self.stream_opening[Dir::Uni as usize].notify_waiters();
self.stream_opening[Dir::Bi as usize].notify_waiters();
if let Some(x) = self.incoming_uni_streams_reader.take() {
x.wake();
}
Expand Down

0 comments on commit bb8fe6e

Please sign in to comment.