Skip to content

Commit

Permalink
transports/quic: adapt QuicMuxer to libp2p#2724
Browse files Browse the repository at this point in the history
Discussed in libp2p#2722.
  • Loading branch information
elenaf9 committed Aug 3, 2022
1 parent 07c0dba commit 57840a3
Showing 1 changed file with 142 additions and 161 deletions.
303 changes: 142 additions & 161 deletions transports/quic/src/muxer.rs
Expand Up @@ -22,7 +22,7 @@ use crate::connection::{Connection, ConnectionEvent};
use crate::error::Error;

use futures::{AsyncRead, AsyncWrite};
use libp2p_core::muxing::{StreamMuxer, StreamMuxerEvent};
use libp2p_core::muxing::StreamMuxer;
use parking_lot::Mutex;
use std::{
collections::{HashMap, VecDeque},
Expand Down Expand Up @@ -54,6 +54,60 @@ struct QuicMuxerInner {
poll_event_waker: Option<Waker>,
}

impl QuicMuxerInner {
fn poll_connection(&mut self, cx: &mut Context<'_>) {
while let Poll::Ready(event) = self.connection.poll_event(cx) {
match event {
ConnectionEvent::Connected => {
tracing::error!("Unexpected Connected event on established QUIC connection");
}
ConnectionEvent::ConnectionLost(_) => {
if let Some(waker) = self.poll_close_waker.take() {
waker.wake();
}
self.connection.close();
}

ConnectionEvent::StreamOpened => {
if let Some(waker) = self.pending_substreams.pop_front() {
waker.wake();
}
}
ConnectionEvent::StreamReadable(substream) => {
if let Some(substream) = self.substreams.get_mut(&substream) {
if let Some(waker) = substream.read_waker.take() {
waker.wake();
}
}
}
ConnectionEvent::StreamWritable(substream) => {
if let Some(substream) = self.substreams.get_mut(&substream) {
if let Some(waker) = substream.write_waker.take() {
waker.wake();
}
}
}
ConnectionEvent::StreamFinished(substream) => {
if let Some(substream) = self.substreams.get_mut(&substream) {
substream.finished = true;
if let Some(waker) = substream.finished_waker.take() {
waker.wake();
}
}
}
ConnectionEvent::StreamStopped(substream) => {
if let Some(substream) = self.substreams.get_mut(&substream) {
substream.stopped = true;
}
}
ConnectionEvent::StreamAvailable => {
// Handled below.
}
}
}
}
}

/// State of a single substream.
#[derive(Default, Clone)]
struct SubstreamState {
Expand Down Expand Up @@ -89,6 +143,93 @@ impl QuicMuxer {
}
}
}
impl StreamMuxer for QuicMuxer {
type Substream = Substream;
type Error = Error;

fn poll_address_change(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<libp2p_core::Multiaddr, Self::Error>> {
self.inner.lock().poll_connection(cx);
// TODO
Poll::Pending
}

fn poll_inbound(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Self::Substream, Self::Error>> {
let mut inner = self.inner.lock();
inner.poll_connection(cx);
if let Some(substream_id) = inner.connection.pop_incoming_substream() {
inner.substreams.insert(substream_id, Default::default());
let substream = Substream::new(substream_id, self.inner.clone());
Poll::Ready(Ok(substream))
} else {
inner.poll_event_waker = Some(cx.waker().clone());
Poll::Pending
}
}

fn poll_outbound(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Self::Substream, Self::Error>> {
let mut inner = self.inner.lock();
inner.poll_connection(cx);
if let Some(substream_id) = inner.connection.pop_outgoing_substream() {
inner.substreams.insert(substream_id, Default::default());
let substream = Substream::new(substream_id, self.inner.clone());
Poll::Ready(Ok(substream))
} else {
inner.pending_substreams.push_back(cx.waker().clone());
Poll::Pending
}
}

fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let mut inner = self.inner.lock();
inner.poll_connection(cx);

if inner.connection.connection.is_drained() {
return Poll::Ready(Ok(()));
}

if inner.substreams.is_empty() {
let connection = &mut inner.connection;
if !connection.connection.is_closed() {
connection.close();
if let Some(waker) = inner.poll_event_waker.take() {
waker.wake();
}
} else {
}
while let Poll::Ready(event) = inner.connection.poll_event(cx) {
if let ConnectionEvent::ConnectionLost(_) = event {
return Poll::Ready(Ok(()));
}
}
} else {
for substream in inner.substreams.clone().keys() {
if let Err(e) = inner.connection.shutdown_substream(*substream) {
tracing::error!("substream finish error on muxer close: {}", e);
}
}
}

// Register `cx.waker()` as being woken up if the connection closes.
inner.poll_close_waker = Some(cx.waker().clone());

Poll::Pending
}
}

impl fmt::Debug for QuicMuxer {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_tuple("QuicMuxer").finish()
}
}

pub struct Substream {
id: quinn_proto::StreamId,
Expand Down Expand Up @@ -270,163 +411,3 @@ impl AsyncWrite for Substream {
}
}
}

impl StreamMuxer for QuicMuxer {
type OutboundSubstream = ();
type Substream = Substream;
type Error = Error;

/// Polls for a connection-wide event.
///
/// This function behaves the same as a `Stream`.
///
/// If `Pending` is returned, then the current task will be notified once the muxer
/// is ready to be polled, similar to the API of `Stream::poll()`.
/// Only the latest task that was used to call this method may be notified.
///
/// It is permissible and common to use this method to perform background
/// work, such as processing incoming packets and polling timers.
///
/// An error can be generated if the connection has been closed.
fn poll_event(
&self,
cx: &mut Context<'_>,
) -> Poll<Result<StreamMuxerEvent<Self::Substream>, Self::Error>> {
// We use `poll_event` to perform the background processing of the entire connection.
let mut inner = self.inner.lock();

while let Poll::Ready(event) = inner.connection.poll_event(cx) {
match event {
ConnectionEvent::Connected => {
tracing::error!("Unexpected Connected event on established QUIC connection");
}
ConnectionEvent::ConnectionLost(_) => {
if let Some(waker) = inner.poll_close_waker.take() {
waker.wake();
}
inner.connection.close();
}

ConnectionEvent::StreamOpened => {
if let Some(waker) = inner.pending_substreams.pop_front() {
waker.wake();
}
}
ConnectionEvent::StreamReadable(substream) => {
if let Some(substream) = inner.substreams.get_mut(&substream) {
if let Some(waker) = substream.read_waker.take() {
waker.wake();
}
}
}
ConnectionEvent::StreamWritable(substream) => {
if let Some(substream) = inner.substreams.get_mut(&substream) {
if let Some(waker) = substream.write_waker.take() {
waker.wake();
}
}
}
ConnectionEvent::StreamFinished(substream) => {
if let Some(substream) = inner.substreams.get_mut(&substream) {
substream.finished = true;
if let Some(waker) = substream.finished_waker.take() {
waker.wake();
}
}
}
ConnectionEvent::StreamStopped(substream) => {
if let Some(substream) = inner.substreams.get_mut(&substream) {
substream.stopped = true;
}
}
ConnectionEvent::StreamAvailable => {
// Handled below.
}
}
}

if let Some(substream_id) = inner.connection.pop_incoming_substream() {
inner.substreams.insert(substream_id, Default::default());
let substream = Substream::new(substream_id, self.inner.clone());
Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(substream)))
} else {
inner.poll_event_waker = Some(cx.waker().clone());
Poll::Pending
}
}

/// Opens a new outgoing substream, and produces the equivalent to a future that will be
/// resolved when it becomes available.
///
/// We provide the same handler to poll it by multiple tasks, which is done as a FIFO
/// queue via `poll_outbound`.
fn open_outbound(&self) -> Self::OutboundSubstream {}

/// Polls the outbound substream.
///
/// If `Pending` is returned, then the current task will be notified once the substream
/// is ready to be polled, similar to the API of `Future::poll()`.
fn poll_outbound(
&self,
cx: &mut Context<'_>,
_: &mut Self::OutboundSubstream,
) -> Poll<Result<Self::Substream, Self::Error>> {
let mut inner = self.inner.lock();
if let Some(substream_id) = inner.connection.pop_outgoing_substream() {
inner.substreams.insert(substream_id, Default::default());
let substream = Substream::new(substream_id, self.inner.clone());
Poll::Ready(Ok(substream))
} else {
inner.pending_substreams.push_back(cx.waker().clone());
Poll::Pending
}
}

/// Destroys an outbound substream future. Use this after the outbound substream has finished,
/// or if you want to interrupt it.
fn destroy_outbound(&self, _: Self::OutboundSubstream) {
// Do nothing because we don't know which waker should be destroyed.
// TODO `Self::OutboundSubstream` -> autoincrement id.
}

fn poll_close(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let mut inner = self.inner.lock();

if inner.connection.connection.is_drained() {
return Poll::Ready(Ok(()));
}

if inner.substreams.is_empty() {
let connection = &mut inner.connection;
if !connection.connection.is_closed() {
connection.close();
if let Some(waker) = inner.poll_event_waker.take() {
waker.wake();
}
} else {
}
while let Poll::Ready(event) = inner.connection.poll_event(cx) {
if let ConnectionEvent::ConnectionLost(_) = event {
return Poll::Ready(Ok(()));
}
}
} else {
for substream in inner.substreams.clone().keys() {
if let Err(e) = inner.connection.shutdown_substream(*substream) {
tracing::error!("substream finish error on muxer close: {}", e);
}
}
}

// Register `cx.waker()` as being woken up if the connection closes.
inner.poll_close_waker = Some(cx.waker().clone());

Poll::Pending
}
}

impl fmt::Debug for QuicMuxer {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_tuple("QuicMuxer").finish()
}
}

0 comments on commit 57840a3

Please sign in to comment.