Skip to content

Commit

Permalink
quic: submodules for connection
Browse files Browse the repository at this point in the history
- Move substream-related logic into crate::connection::substream
- Rename/ move crate::upgrade -> crate::connection::connecting
  • Loading branch information
elenaf9 committed Oct 16, 2022
1 parent 954908b commit 8034850
Show file tree
Hide file tree
Showing 5 changed files with 207 additions and 172 deletions.
176 changes: 8 additions & 168 deletions transports/quic/src/connection.rs
Expand Up @@ -18,19 +18,24 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

mod connecting;
mod substream;

use crate::{
endpoint::{self, ToEndpoint},
Error,
};
pub use connecting::Connecting;
pub use substream::Substream;
use substream::SubstreamState;

use futures::{channel::mpsc, ready, AsyncRead, AsyncWrite, FutureExt, StreamExt};
use futures::{channel::mpsc, ready, FutureExt, StreamExt};
use futures_timer::Delay;
use libp2p_core::muxing::{StreamMuxer, StreamMuxerEvent};
use parking_lot::Mutex;
use std::{
any::Any,
collections::HashMap,
io::{self, Write},
net::SocketAddr,
pin::Pin,
sync::Arc,
Expand Down Expand Up @@ -380,6 +385,7 @@ pub struct State {

/// State of all the substreams that the muxer reports as open.
pub substreams: HashMap<quinn_proto::StreamId, SubstreamState>,

/// Waker to wake if a new outbound substream is opened.
pub poll_outbound_waker: Option<Waker>,
/// Waker to wake if a new inbound substream was happened.
Expand All @@ -395,169 +401,3 @@ impl State {
.expect("Substream should be known.")
}
}

/// State of a single substream.
#[derive(Debug, Default, Clone)]
pub struct SubstreamState {
/// Waker to wake if the substream becomes readable or stopped.
read_waker: Option<Waker>,
/// Waker to wake if the substream becomes writable or stopped.
write_waker: Option<Waker>,
/// Waker to wake if the substream becomes closed or stopped.
finished_waker: Option<Waker>,

is_write_closed: bool,
}

impl SubstreamState {
fn wake_all(&mut self) {
if let Some(waker) = self.read_waker.take() {
waker.wake();
}
if let Some(waker) = self.write_waker.take() {
waker.wake();
}
if let Some(waker) = self.finished_waker.take() {
waker.wake();
}
}
}

#[derive(Debug)]
pub struct Substream {
id: quinn_proto::StreamId,
state: Arc<Mutex<State>>,
}

impl Substream {
fn new(id: quinn_proto::StreamId, state: Arc<Mutex<State>>) -> Self {
Self { id, state }
}
}

impl AsyncRead for Substream {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
mut buf: &mut [u8],
) -> Poll<io::Result<usize>> {
let mut state = self.state.lock();

let mut stream = state.connection.recv_stream(self.id);
let mut chunks = match stream.read(true) {
Ok(chunks) => chunks,
Err(quinn_proto::ReadableError::UnknownStream) => {
return Poll::Ready(Ok(0));
}
Err(quinn_proto::ReadableError::IllegalOrderedRead) => {
unreachable!(
"Illegal ordered read can only happen if `stream.read(false)` is used."
);
}
};
let mut bytes = 0;
let mut pending = false;
loop {
let chunk = match chunks.next(buf.len()) {
Ok(Some(chunk)) if !chunk.bytes.is_empty() => chunk,
Ok(_) => break,
Err(err @ quinn_proto::ReadError::Reset(_)) => {
return Poll::Ready(Err(io::Error::new(io::ErrorKind::ConnectionReset, err)))
}
Err(quinn_proto::ReadError::Blocked) => {
pending = true;
break;
}
};

buf.write_all(&chunk.bytes).expect("enough buffer space");
bytes += chunk.bytes.len();
}
if chunks.finalize().should_transmit() {
if let Some(waker) = state.poll_connection_waker.take() {
waker.wake();
}
}
if pending && bytes == 0 {
let substream_state = state.unchecked_substream_state(self.id);
substream_state.read_waker = Some(cx.waker().clone());
Poll::Pending
} else {
Poll::Ready(Ok(bytes))
}
}
}

impl AsyncWrite for Substream {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<std::io::Result<usize>> {
let mut state = self.state.lock();

match state.connection.send_stream(self.id).write(buf) {
Ok(bytes) => {
if let Some(waker) = state.poll_connection_waker.take() {
waker.wake();
}
Poll::Ready(Ok(bytes))
}
Err(quinn_proto::WriteError::Blocked) => {
let substream_state = state.unchecked_substream_state(self.id);
substream_state.write_waker = Some(cx.waker().clone());
Poll::Pending
}
Err(err @ quinn_proto::WriteError::Stopped(_)) => {
Poll::Ready(Err(io::Error::new(io::ErrorKind::ConnectionReset, err)))
}
Err(quinn_proto::WriteError::UnknownStream) => {
Poll::Ready(Err(io::ErrorKind::BrokenPipe.into()))
}
}
}

fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
// quinn doesn't support flushing, calling close will flush all substreams.
Poll::Ready(Ok(()))
}

fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
let mut inner = self.state.lock();

if inner.unchecked_substream_state(self.id).is_write_closed {
return Poll::Ready(Ok(()));
}

match inner.connection.send_stream(self.id).finish() {
Ok(()) => {
let substream_state = inner.unchecked_substream_state(self.id);
substream_state.finished_waker = Some(cx.waker().clone());
Poll::Pending
}
Err(err @ quinn_proto::FinishError::Stopped(_)) => {
Poll::Ready(Err(io::Error::new(io::ErrorKind::ConnectionReset, err)))
}
Err(quinn_proto::FinishError::UnknownStream) => {
Poll::Ready(Err(io::ErrorKind::BrokenPipe.into()))
}
}
}
}

impl Drop for Substream {
fn drop(&mut self) {
let mut state = self.state.lock();
state.substreams.remove(&self.id);
let _ = state.connection.recv_stream(self.id).stop(0u32.into());
let mut send_stream = state.connection.send_stream(self.id);
match send_stream.finish() {
Ok(()) => {}
// Already finished or reset, which is fine.
Err(quinn_proto::FinishError::UnknownStream) => {}
Err(quinn_proto::FinishError::Stopped(reason)) => {
let _ = send_stream.reset(reason);
}
}
}
}
File renamed without changes.

0 comments on commit 8034850

Please sign in to comment.