Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update stream impl for Framed to return None after Err #4166

Merged
merged 8 commits into from Oct 26, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions tokio-util/src/codec/framed.rs
Expand Up @@ -106,6 +106,7 @@ where
eof: false,
is_readable: false,
buffer: BytesMut::with_capacity(capacity),
has_errored: false,
},
write: WriteFrame::default(),
},
Expand Down
82 changes: 56 additions & 26 deletions tokio-util/src/codec/framed_impl.rs
Expand Up @@ -27,10 +27,12 @@ pin_project! {
const INITIAL_CAPACITY: usize = 8 * 1024;
const BACKPRESSURE_BOUNDARY: usize = INITIAL_CAPACITY;

#[derive(Debug)]
pub(crate) struct ReadFrame {
pub(crate) eof: bool,
pub(crate) is_readable: bool,
pub(crate) buffer: BytesMut,
pub(crate) has_errored: bool,
}

pub(crate) struct WriteFrame {
Expand All @@ -49,6 +51,7 @@ impl Default for ReadFrame {
eof: false,
is_readable: false,
buffer: BytesMut::with_capacity(INITIAL_CAPACITY),
has_errored: false,
}
}
}
Expand All @@ -72,6 +75,7 @@ impl From<BytesMut> for ReadFrame {
buffer,
is_readable: size > 0,
eof: false,
has_errored: false,
}
}
}
Expand Down Expand Up @@ -126,30 +130,42 @@ where
//
// The initial state is `reading`.
//
// | state | eof | is_readable |
// |---------|-------|-------------|
// | reading | false | false |
// | framing | false | true |
// | pausing | true | true |
// | paused | true | false |
//
// `decode_eof`
// returns `Some` read 0 bytes
// │ │ │ │
// │ ▼ │ ▼
// ┌───────┐ `decode_eof` ┌──────┐
// ┌──read 0 bytes──▶│pausing│─returns `None`─▶│paused│──┐
// │ └───────┘ └──────┘ │
// pending read┐ │ ┌──────┐ │ ▲ │
// │ │ │ │ │ │ │ │
// │ ▼ │ │ `decode` returns `Some`│ pending read
// │ ╔═══════╗ ┌───────┐◀─┘ │
// └──║reading║─read n>0 bytes─▶│framing│ │
// ╚═══════╝ └───────┘◀──────read n>0 bytes┘
// ▲ │
// │ │
// └─`decode` returns `None`─┘
// | state | eof | is_readable | has_errored |
// |---------|-------|-------------|-------------|
// | reading | false | false | false |
// | framing | false | true | false |
// | pausing | true | true | false |
// | paused | true | false | false |
// | errored | <any> | <any> | true |
// `decode_eof` returns Err
// ┌────────────────────────────────────────────────────────┐
// `decode_eof` returns │ │
// `Ok(Some)` │ │
// ┌─────┐ │ `decode_eof` returns After returning │
// Read 0 bytes ├─────▼──┴┐ `Ok(None)` ┌────────┐ ◄───┐ `None` ┌───▼─────┐
// ┌────────────────►│ Pausing ├───────────────────────►│ Paused ├─┐ └───────────┤ Errored │
// │ └─────────┘ └─┬──▲───┘ │ └───▲───▲─┘
// Pending read │ │ │ │ │ │
// ┌──────┐ │ `decode` returns `Some` │ └─────┘ │ │
// │ │ │ ┌──────┐ │ Pending │ │
// │ ┌────▼──┴─┐ Read n>0 bytes ┌┴──────▼─┐ read n>0 bytes │ read │ │
// └─┤ Reading ├───────────────►│ Framing │◄────────────────────────┘ │ │
// └──┬─▲────┘ └─────┬──┬┘ │ │
// │ │ │ │ `decode` returns Err │ │
// │ └───decode` returns `None`──┘ └───────────────────────────────────────────────────────┘ │
// │ read returns Err │
// └────────────────────────────────────────────────────────────────────────────────────────────┘
loop {
// Return `None` if we have encountered an error from the underlying decoder
// See: https://github.com/tokio-rs/tokio/issues/3976
if state.has_errored {
// preparing has_errored -> paused
trace!("Returning None and setting paused");
state.is_readable = false;
state.has_errored = false;
return Poll::Ready(None);
}

// Repeatedly call `decode` or `decode_eof` while the buffer is "readable",
// i.e. it _might_ contain data consumable as a frame or closing frame.
// Both signal that there is no such data by returning `None`.
Expand All @@ -165,7 +181,11 @@ where
// pausing or framing
if state.eof {
// pausing
let frame = pinned.codec.decode_eof(&mut state.buffer)?;
let frame = pinned.codec.decode_eof(&mut state.buffer).map_err(|err| {
trace!("Got an error, going to errored state");
state.has_errored = true;
err
})?;
if frame.is_none() {
state.is_readable = false; // prepare pausing -> paused
}
Expand All @@ -176,7 +196,11 @@ where
// framing
trace!("attempting to decode a frame");

if let Some(frame) = pinned.codec.decode(&mut state.buffer)? {
if let Some(frame) = pinned.codec.decode(&mut state.buffer).map_err(|op| {
trace!("Got an error, going to errored state");
state.has_errored = true;
op
})? {
trace!("frame decoded from buffer");
// implicit framing -> framing
return Poll::Ready(Some(Ok(frame)));
Expand All @@ -190,7 +214,13 @@ where
// Make sure we've got room for at least one byte to read to ensure
// that we don't get a spurious 0 that looks like EOF.
state.buffer.reserve(1);
let bytect = match poll_read_buf(pinned.inner.as_mut(), cx, &mut state.buffer)? {
let bytect = match poll_read_buf(pinned.inner.as_mut(), cx, &mut state.buffer).map_err(
|err| {
trace!("Got an error, going to errored state");
state.has_errored = true;
err
},
)? {
Poll::Ready(ct) => ct,
// implicit reading -> reading or implicit paused -> paused
Poll::Pending => return Poll::Pending,
Expand Down
1 change: 1 addition & 0 deletions tokio-util/src/codec/framed_read.rs
Expand Up @@ -51,6 +51,7 @@ where
eof: false,
is_readable: false,
buffer: BytesMut::with_capacity(capacity),
has_errored: false,
},
},
}
Expand Down
80 changes: 80 additions & 0 deletions tokio-util/tests/framed_stream.rs
@@ -0,0 +1,80 @@
use futures::Stream;
use std::{
collections::VecDeque,
io,
pin::Pin,
task::{Context, Poll},
};
use tokio::io::{AsyncRead, ReadBuf};
use tokio_test::{assert_ready, task};
use tokio_util::codec::{BytesCodec, FramedRead};

macro_rules! mock {
($($x:expr,)*) => {{
let mut v = VecDeque::new();
v.extend(vec![$($x),*]);
Mock { calls: v }
}};
}

macro_rules! pin {
($id:ident) => {
Pin::new(&mut $id)
};
}

macro_rules! assert_read {
($e:expr, $n:expr) => {{
let val = assert_ready!($e);
assert_eq!(val.unwrap().unwrap(), $n);
}};
}

#[test]
fn return_none_after_error() {
let mut io = FramedRead::new(
mock! {
Ok(b"abcdef".to_vec()),
Err(io::Error::new(io::ErrorKind::Other, "Resource errored out")),
Ok(b"more data".to_vec()),
},
BytesCodec::new(),
);

let mut task = task::spawn(());

task.enter(|cx, _| {
assert_read!(pin!(io).poll_next(cx), b"abcdef".to_vec());
let val = assert_ready!(pin!(io).poll_next(cx));
assert!(val.unwrap().is_err());
let val = assert_ready!(pin!(io).poll_next(cx));
assert!(val.is_none());
Darksonn marked this conversation as resolved.
Show resolved Hide resolved
assert_read!(pin!(io).poll_next(cx), b"more data".to_vec());
})
}

// ================= Mock =================
struct Mock {
calls: VecDeque<io::Result<Vec<u8>>>,
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any reason you aren't using the mock in tokio-test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I followed the pattern in the other codec tests mostly. I saw that there's a lot of duplication, but didn't know about tokio-test itself. Do you want me to move this test to use tokio-test?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be cleaner to use tokio-test.


impl AsyncRead for Mock {
fn poll_read(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
use io::ErrorKind::WouldBlock;

match self.calls.pop_front() {
Some(Ok(data)) => {
debug_assert!(buf.remaining() >= data.len());
buf.put_slice(&data);
Poll::Ready(Ok(()))
}
Some(Err(ref e)) if e.kind() == WouldBlock => Poll::Pending,
Some(Err(e)) => Poll::Ready(Err(e)),
None => Poll::Ready(Ok(())),
}
}
}