Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update stream impl for Framed to return None after Err #4166

Merged
merged 8 commits into from Oct 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions tokio-util/src/codec/framed.rs
Expand Up @@ -106,6 +106,7 @@ where
eof: false,
is_readable: false,
buffer: BytesMut::with_capacity(capacity),
has_errored: false,
},
write: WriteFrame::default(),
},
Expand Down
82 changes: 56 additions & 26 deletions tokio-util/src/codec/framed_impl.rs
Expand Up @@ -27,10 +27,12 @@ pin_project! {
const INITIAL_CAPACITY: usize = 8 * 1024;
const BACKPRESSURE_BOUNDARY: usize = INITIAL_CAPACITY;

#[derive(Debug)]
pub(crate) struct ReadFrame {
pub(crate) eof: bool,
pub(crate) is_readable: bool,
pub(crate) buffer: BytesMut,
pub(crate) has_errored: bool,
}

pub(crate) struct WriteFrame {
Expand All @@ -49,6 +51,7 @@ impl Default for ReadFrame {
eof: false,
is_readable: false,
buffer: BytesMut::with_capacity(INITIAL_CAPACITY),
has_errored: false,
}
}
}
Expand All @@ -72,6 +75,7 @@ impl From<BytesMut> for ReadFrame {
buffer,
is_readable: size > 0,
eof: false,
has_errored: false,
}
}
}
Expand Down Expand Up @@ -126,30 +130,42 @@ where
//
// The initial state is `reading`.
//
// | state | eof | is_readable |
// |---------|-------|-------------|
// | reading | false | false |
// | framing | false | true |
// | pausing | true | true |
// | paused | true | false |
//
// `decode_eof`
// returns `Some` read 0 bytes
// │ │ │ │
// │ ▼ │ ▼
// ┌───────┐ `decode_eof` ┌──────┐
// ┌──read 0 bytes──▶│pausing│─returns `None`─▶│paused│──┐
// │ └───────┘ └──────┘ │
// pending read┐ │ ┌──────┐ │ ▲ │
// │ │ │ │ │ │ │ │
// │ ▼ │ │ `decode` returns `Some`│ pending read
// │ ╔═══════╗ ┌───────┐◀─┘ │
// └──║reading║─read n>0 bytes─▶│framing│ │
// ╚═══════╝ └───────┘◀──────read n>0 bytes┘
// ▲ │
// │ │
// └─`decode` returns `None`─┘
// | state | eof | is_readable | has_errored |
// |---------|-------|-------------|-------------|
// | reading | false | false | false |
// | framing | false | true | false |
// | pausing | true | true | false |
// | paused | true | false | false |
// | errored | <any> | <any> | true |
// `decode_eof` returns Err
// ┌────────────────────────────────────────────────────────┐
// `decode_eof` returns │ │
// `Ok(Some)` │ │
// ┌─────┐ │ `decode_eof` returns After returning │
// Read 0 bytes ├─────▼──┴┐ `Ok(None)` ┌────────┐ ◄───┐ `None` ┌───▼─────┐
// ┌────────────────►│ Pausing ├───────────────────────►│ Paused ├─┐ └───────────┤ Errored │
// │ └─────────┘ └─┬──▲───┘ │ └───▲───▲─┘
// Pending read │ │ │ │ │ │
// ┌──────┐ │ `decode` returns `Some` │ └─────┘ │ │
// │ │ │ ┌──────┐ │ Pending │ │
// │ ┌────▼──┴─┐ Read n>0 bytes ┌┴──────▼─┐ read n>0 bytes │ read │ │
// └─┤ Reading ├───────────────►│ Framing │◄────────────────────────┘ │ │
// └──┬─▲────┘ └─────┬──┬┘ │ │
// │ │ │ │ `decode` returns Err │ │
// │ └───decode` returns `None`──┘ └───────────────────────────────────────────────────────┘ │
// │ read returns Err │
// └────────────────────────────────────────────────────────────────────────────────────────────┘
loop {
// Return `None` if we have encountered an error from the underlying decoder
// See: https://github.com/tokio-rs/tokio/issues/3976
if state.has_errored {
// preparing has_errored -> paused
trace!("Returning None and setting paused");
state.is_readable = false;
state.has_errored = false;
return Poll::Ready(None);
}

// Repeatedly call `decode` or `decode_eof` while the buffer is "readable",
// i.e. it _might_ contain data consumable as a frame or closing frame.
// Both signal that there is no such data by returning `None`.
Expand All @@ -165,7 +181,11 @@ where
// pausing or framing
if state.eof {
// pausing
let frame = pinned.codec.decode_eof(&mut state.buffer)?;
let frame = pinned.codec.decode_eof(&mut state.buffer).map_err(|err| {
trace!("Got an error, going to errored state");
state.has_errored = true;
err
})?;
if frame.is_none() {
state.is_readable = false; // prepare pausing -> paused
}
Expand All @@ -176,7 +196,11 @@ where
// framing
trace!("attempting to decode a frame");

if let Some(frame) = pinned.codec.decode(&mut state.buffer)? {
if let Some(frame) = pinned.codec.decode(&mut state.buffer).map_err(|op| {
trace!("Got an error, going to errored state");
state.has_errored = true;
op
})? {
trace!("frame decoded from buffer");
// implicit framing -> framing
return Poll::Ready(Some(Ok(frame)));
Expand All @@ -190,7 +214,13 @@ where
// Make sure we've got room for at least one byte to read to ensure
// that we don't get a spurious 0 that looks like EOF.
state.buffer.reserve(1);
let bytect = match poll_read_buf(pinned.inner.as_mut(), cx, &mut state.buffer)? {
let bytect = match poll_read_buf(pinned.inner.as_mut(), cx, &mut state.buffer).map_err(
|err| {
trace!("Got an error, going to errored state");
state.has_errored = true;
err
},
)? {
Poll::Ready(ct) => ct,
// implicit reading -> reading or implicit paused -> paused
Poll::Pending => return Poll::Pending,
Expand Down
1 change: 1 addition & 0 deletions tokio-util/src/codec/framed_read.rs
Expand Up @@ -51,6 +51,7 @@ where
eof: false,
is_readable: false,
buffer: BytesMut::with_capacity(capacity),
has_errored: false,
},
},
}
Expand Down
38 changes: 38 additions & 0 deletions tokio-util/tests/framed_stream.rs
@@ -0,0 +1,38 @@
use futures_core::stream::Stream;
use std::{io, pin::Pin};
use tokio_test::{assert_ready, io::Builder, task};
use tokio_util::codec::{BytesCodec, FramedRead};

macro_rules! pin {
($id:ident) => {
Pin::new(&mut $id)
};
}

macro_rules! assert_read {
($e:expr, $n:expr) => {{
let val = assert_ready!($e);
assert_eq!(val.unwrap().unwrap(), $n);
}};
}

#[tokio::test]
async fn return_none_after_error() {
let mut io = FramedRead::new(
Builder::new()
.read(b"abcdef")
.read_error(io::Error::new(io::ErrorKind::Other, "Resource errored out"))
.read(b"more data")
.build(),
BytesCodec::new(),
);

let mut task = task::spawn(());

task.enter(|cx, _| {
assert_read!(pin!(io).poll_next(cx), b"abcdef".to_vec());
assert!(assert_ready!(pin!(io).poll_next(cx)).unwrap().is_err());
assert!(assert_ready!(pin!(io).poll_next(cx)).is_none());
assert_read!(pin!(io).poll_next(cx), b"more data".to_vec());
})
}