diff --git a/transports/quic/src/muxer.rs b/transports/quic/src/muxer.rs index 9fe6e7e4cd9..aaa0a7edcbe 100644 --- a/transports/quic/src/muxer.rs +++ b/transports/quic/src/muxer.rs @@ -22,7 +22,7 @@ use crate::connection::{Connection, ConnectionEvent}; use crate::error::Error; use futures::{AsyncRead, AsyncWrite}; -use libp2p_core::muxing::{StreamMuxer, StreamMuxerEvent}; +use libp2p_core::muxing::StreamMuxer; use parking_lot::Mutex; use std::{ collections::{HashMap, VecDeque}, @@ -54,6 +54,60 @@ struct QuicMuxerInner { poll_event_waker: Option, } +impl QuicMuxerInner { + fn poll_connection(&mut self, cx: &mut Context<'_>) { + while let Poll::Ready(event) = self.connection.poll_event(cx) { + match event { + ConnectionEvent::Connected => { + tracing::error!("Unexpected Connected event on established QUIC connection"); + } + ConnectionEvent::ConnectionLost(_) => { + if let Some(waker) = self.poll_close_waker.take() { + waker.wake(); + } + self.connection.close(); + } + + ConnectionEvent::StreamOpened => { + if let Some(waker) = self.pending_substreams.pop_front() { + waker.wake(); + } + } + ConnectionEvent::StreamReadable(substream) => { + if let Some(substream) = self.substreams.get_mut(&substream) { + if let Some(waker) = substream.read_waker.take() { + waker.wake(); + } + } + } + ConnectionEvent::StreamWritable(substream) => { + if let Some(substream) = self.substreams.get_mut(&substream) { + if let Some(waker) = substream.write_waker.take() { + waker.wake(); + } + } + } + ConnectionEvent::StreamFinished(substream) => { + if let Some(substream) = self.substreams.get_mut(&substream) { + substream.finished = true; + if let Some(waker) = substream.finished_waker.take() { + waker.wake(); + } + } + } + ConnectionEvent::StreamStopped(substream) => { + if let Some(substream) = self.substreams.get_mut(&substream) { + substream.stopped = true; + } + } + ConnectionEvent::StreamAvailable => { + // Handled below. + } + } + } + } +} + /// State of a single substream. #[derive(Default, Clone)] struct SubstreamState { @@ -89,6 +143,93 @@ impl QuicMuxer { } } } +impl StreamMuxer for QuicMuxer { + type Substream = Substream; + type Error = Error; + + fn poll_address_change( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + self.inner.lock().poll_connection(cx); + // TODO + Poll::Pending + } + + fn poll_inbound( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let mut inner = self.inner.lock(); + inner.poll_connection(cx); + if let Some(substream_id) = inner.connection.pop_incoming_substream() { + inner.substreams.insert(substream_id, Default::default()); + let substream = Substream::new(substream_id, self.inner.clone()); + Poll::Ready(Ok(substream)) + } else { + inner.poll_event_waker = Some(cx.waker().clone()); + Poll::Pending + } + } + + fn poll_outbound( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let mut inner = self.inner.lock(); + inner.poll_connection(cx); + if let Some(substream_id) = inner.connection.pop_outgoing_substream() { + inner.substreams.insert(substream_id, Default::default()); + let substream = Substream::new(substream_id, self.inner.clone()); + Poll::Ready(Ok(substream)) + } else { + inner.pending_substreams.push_back(cx.waker().clone()); + Poll::Pending + } + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut inner = self.inner.lock(); + inner.poll_connection(cx); + + if inner.connection.connection.is_drained() { + return Poll::Ready(Ok(())); + } + + if inner.substreams.is_empty() { + let connection = &mut inner.connection; + if !connection.connection.is_closed() { + connection.close(); + if let Some(waker) = inner.poll_event_waker.take() { + waker.wake(); + } + } else { + } + while let Poll::Ready(event) = inner.connection.poll_event(cx) { + if let ConnectionEvent::ConnectionLost(_) = event { + return Poll::Ready(Ok(())); + } + } + } else { + for substream in inner.substreams.clone().keys() { + if let Err(e) = inner.connection.shutdown_substream(*substream) { + tracing::error!("substream finish error on muxer close: {}", e); + } + } + } + + // Register `cx.waker()` as being woken up if the connection closes. + inner.poll_close_waker = Some(cx.waker().clone()); + + Poll::Pending + } +} + +impl fmt::Debug for QuicMuxer { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_tuple("QuicMuxer").finish() + } +} pub struct Substream { id: quinn_proto::StreamId, @@ -270,163 +411,3 @@ impl AsyncWrite for Substream { } } } - -impl StreamMuxer for QuicMuxer { - type OutboundSubstream = (); - type Substream = Substream; - type Error = Error; - - /// Polls for a connection-wide event. - /// - /// This function behaves the same as a `Stream`. - /// - /// If `Pending` is returned, then the current task will be notified once the muxer - /// is ready to be polled, similar to the API of `Stream::poll()`. - /// Only the latest task that was used to call this method may be notified. - /// - /// It is permissible and common to use this method to perform background - /// work, such as processing incoming packets and polling timers. - /// - /// An error can be generated if the connection has been closed. - fn poll_event( - &self, - cx: &mut Context<'_>, - ) -> Poll, Self::Error>> { - // We use `poll_event` to perform the background processing of the entire connection. - let mut inner = self.inner.lock(); - - while let Poll::Ready(event) = inner.connection.poll_event(cx) { - match event { - ConnectionEvent::Connected => { - tracing::error!("Unexpected Connected event on established QUIC connection"); - } - ConnectionEvent::ConnectionLost(_) => { - if let Some(waker) = inner.poll_close_waker.take() { - waker.wake(); - } - inner.connection.close(); - } - - ConnectionEvent::StreamOpened => { - if let Some(waker) = inner.pending_substreams.pop_front() { - waker.wake(); - } - } - ConnectionEvent::StreamReadable(substream) => { - if let Some(substream) = inner.substreams.get_mut(&substream) { - if let Some(waker) = substream.read_waker.take() { - waker.wake(); - } - } - } - ConnectionEvent::StreamWritable(substream) => { - if let Some(substream) = inner.substreams.get_mut(&substream) { - if let Some(waker) = substream.write_waker.take() { - waker.wake(); - } - } - } - ConnectionEvent::StreamFinished(substream) => { - if let Some(substream) = inner.substreams.get_mut(&substream) { - substream.finished = true; - if let Some(waker) = substream.finished_waker.take() { - waker.wake(); - } - } - } - ConnectionEvent::StreamStopped(substream) => { - if let Some(substream) = inner.substreams.get_mut(&substream) { - substream.stopped = true; - } - } - ConnectionEvent::StreamAvailable => { - // Handled below. - } - } - } - - if let Some(substream_id) = inner.connection.pop_incoming_substream() { - inner.substreams.insert(substream_id, Default::default()); - let substream = Substream::new(substream_id, self.inner.clone()); - Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(substream))) - } else { - inner.poll_event_waker = Some(cx.waker().clone()); - Poll::Pending - } - } - - /// Opens a new outgoing substream, and produces the equivalent to a future that will be - /// resolved when it becomes available. - /// - /// We provide the same handler to poll it by multiple tasks, which is done as a FIFO - /// queue via `poll_outbound`. - fn open_outbound(&self) -> Self::OutboundSubstream {} - - /// Polls the outbound substream. - /// - /// If `Pending` is returned, then the current task will be notified once the substream - /// is ready to be polled, similar to the API of `Future::poll()`. - fn poll_outbound( - &self, - cx: &mut Context<'_>, - _: &mut Self::OutboundSubstream, - ) -> Poll> { - let mut inner = self.inner.lock(); - if let Some(substream_id) = inner.connection.pop_outgoing_substream() { - inner.substreams.insert(substream_id, Default::default()); - let substream = Substream::new(substream_id, self.inner.clone()); - Poll::Ready(Ok(substream)) - } else { - inner.pending_substreams.push_back(cx.waker().clone()); - Poll::Pending - } - } - - /// Destroys an outbound substream future. Use this after the outbound substream has finished, - /// or if you want to interrupt it. - fn destroy_outbound(&self, _: Self::OutboundSubstream) { - // Do nothing because we don't know which waker should be destroyed. - // TODO `Self::OutboundSubstream` -> autoincrement id. - } - - fn poll_close(&self, cx: &mut Context<'_>) -> Poll> { - let mut inner = self.inner.lock(); - - if inner.connection.connection.is_drained() { - return Poll::Ready(Ok(())); - } - - if inner.substreams.is_empty() { - let connection = &mut inner.connection; - if !connection.connection.is_closed() { - connection.close(); - if let Some(waker) = inner.poll_event_waker.take() { - waker.wake(); - } - } else { - } - while let Poll::Ready(event) = inner.connection.poll_event(cx) { - if let ConnectionEvent::ConnectionLost(_) = event { - return Poll::Ready(Ok(())); - } - } - } else { - for substream in inner.substreams.clone().keys() { - if let Err(e) = inner.connection.shutdown_substream(*substream) { - tracing::error!("substream finish error on muxer close: {}", e); - } - } - } - - // Register `cx.waker()` as being woken up if the connection closes. - inner.poll_close_waker = Some(cx.waker().clone()); - - Poll::Pending - } -} - -impl fmt::Debug for QuicMuxer { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_tuple("QuicMuxer").finish() - } -}