Skip to content

Commit

Permalink
Define Connection::closed helper to await connection termination
Browse files Browse the repository at this point in the history
This has come up several times, most recently in
#1395. While equivalent
behavior is already possible by waiting for e.g. `IncomingUniStreams`
to yield `Err` or `None`, this is substantially more discoverable.
  • Loading branch information
Ralith committed Aug 15, 2022
1 parent f7203ee commit 7a2e014
Showing 1 changed file with 23 additions and 0 deletions.
23 changes: 23 additions & 0 deletions quinn/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,26 @@ impl Connection {
}
}

/// Wait for the connection to be closed for any reason
pub async fn closed(&self) -> ConnectionError {
let closed;
{
let conn = self.0.lock("closed");
closed = conn.closed.clone();
// Construct the future while the lock is held to ensure we can't miss a wakeup if
// the `Notify` is signaled immediately after we release the lock. `await` it after
// the lock guard is out of scope.
closed.notified()
}
.await;
self.0
.lock("closed")
.error
.as_ref()
.expect("closed without an error")
.clone()
}

/// Close the connection immediately.
///
/// Pending operations will fail immediately with [`ConnectionError::LocallyClosed`]. Delivery
Expand Down Expand Up @@ -731,6 +751,7 @@ impl ConnectionRef {
datagram_reader: None,
finishing: FxHashMap::default(),
stopped: FxHashMap::default(),
closed: Arc::new(Notify::new()),
error: None,
ref_count: 0,
udp_state,
Expand Down Expand Up @@ -792,6 +813,7 @@ pub struct ConnectionInner {
datagram_reader: Option<Waker>,
pub(crate) finishing: FxHashMap<StreamId, oneshot::Sender<Option<WriteError>>>,
pub(crate) stopped: FxHashMap<StreamId, Waker>,
closed: Arc<Notify>,
/// Always set to Some before the connection becomes drained
pub(crate) error: Option<ConnectionError>,
/// Number of live handles that can be used to initiate or handle I/O; excludes the driver
Expand Down Expand Up @@ -1022,6 +1044,7 @@ impl ConnectionInner {
for (_, waker) in self.stopped.drain() {
waker.wake();
}
self.closed.notify_waiters();
}

fn close(&mut self, error_code: VarInt, reason: Bytes) {
Expand Down

0 comments on commit 7a2e014

Please sign in to comment.