diff --git a/quinn/src/connection.rs b/quinn/src/connection.rs index 6faba3511d..017802a140 100644 --- a/quinn/src/connection.rs +++ b/quinn/src/connection.rs @@ -327,23 +327,8 @@ impl Connection { /// consequence, the peer won't be notified that a stream has been opened until the stream is /// actually used. pub async fn open_uni(&self) -> Result { - loop { - let opening; - { - let mut conn = self.0.lock("open_uni"); - if let Some(ref e) = conn.error { - return Err(e.clone()); - } - if let Some(id) = conn.inner.streams().open(Dir::Uni) { - let is_0rtt = conn.inner.side().is_client() && conn.inner.is_handshaking(); - drop(conn); // Release lock for clone - return Ok(SendStream::new(self.0.clone(), id, is_0rtt)); - } - opening = conn.uni_opening.clone(); - opening.notified() // Must be called, but not awaited, while the connection lock is held - } - .await - } + let (id, is_0rtt) = self.open(Dir::Uni).await?; + Ok(SendStream::new(self.0.clone(), id, is_0rtt)) } /// Initiate a new outgoing bidirectional stream. @@ -352,22 +337,26 @@ impl Connection { /// consequence, the peer won't be notified that a stream has been opened until the stream is /// actually used. pub async fn open_bi(&self) -> Result<(SendStream, RecvStream), ConnectionError> { + let (id, is_0rtt) = self.open(Dir::Bi).await?; + Ok(( + SendStream::new(self.0.clone(), id, is_0rtt), + RecvStream::new(self.0.clone(), id, is_0rtt), + )) + } + + async fn open(&self, dir: Dir) -> Result<(StreamId, bool), ConnectionError> { loop { let opening; { - let mut conn = self.0.lock("open_bi"); + let mut conn = self.0.lock("open"); if let Some(ref e) = conn.error { return Err(e.clone()); } - if let Some(id) = conn.inner.streams().open(Dir::Bi) { + if let Some(id) = conn.inner.streams().open(dir) { let is_0rtt = conn.inner.side().is_client() && conn.inner.is_handshaking(); - drop(conn); // Release lock for clone - return Ok(( - SendStream::new(self.0.clone(), id, is_0rtt), - RecvStream::new(self.0.clone(), id, is_0rtt), - )); + return Ok((id, is_0rtt)); } - opening = conn.bi_opening.clone(); + opening = conn.stream_opening[dir as usize].clone(); opening.notified() // Must be called, but not awaited, while the connection lock is held } .await; @@ -677,8 +666,7 @@ impl ConnectionRef { endpoint_events, blocked_writers: FxHashMap::default(), blocked_readers: FxHashMap::default(), - uni_opening: Arc::new(Notify::new()), - bi_opening: Arc::new(Notify::new()), + stream_opening: [Arc::new(Notify::new()), Arc::new(Notify::new())], incoming_uni_streams_reader: None, incoming_bi_streams_reader: None, datagram_reader: None, @@ -738,8 +726,7 @@ pub struct ConnectionInner { endpoint_events: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>, pub(crate) blocked_writers: FxHashMap, pub(crate) blocked_readers: FxHashMap, - uni_opening: Arc, - bi_opening: Arc, + stream_opening: [Arc; 2], incoming_uni_streams_reader: Option, incoming_bi_streams_reader: Option, datagram_reader: Option, @@ -862,11 +849,7 @@ impl ConnectionInner { } } Stream(StreamEvent::Available { dir }) => { - let tasks = match dir { - Dir::Uni => &mut self.uni_opening, - Dir::Bi => &mut self.bi_opening, - }; - tasks.notify_one(); + self.stream_opening[dir as usize].notify_one(); } Stream(StreamEvent::Finished { id }) => { if let Some(finishing) = self.finishing.remove(&id) { @@ -958,8 +941,8 @@ impl ConnectionInner { for (_, reader) in self.blocked_readers.drain() { reader.wake() } - self.uni_opening.notify_waiters(); - self.bi_opening.notify_waiters(); + self.stream_opening[Dir::Uni as usize].notify_waiters(); + self.stream_opening[Dir::Bi as usize].notify_waiters(); if let Some(x) = self.incoming_uni_streams_reader.take() { x.wake(); }