New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update stream impl for Framed to return None
after Err
#4166
Changes from 1 commit
3ae1591
b098fea
83f88bd
0d504bd
0577753
73d6d3f
0154048
d387593
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,6 +31,7 @@ pub(crate) struct ReadFrame { | |
pub(crate) eof: bool, | ||
pub(crate) is_readable: bool, | ||
pub(crate) buffer: BytesMut, | ||
pub(crate) has_errored: bool, | ||
} | ||
|
||
pub(crate) struct WriteFrame { | ||
|
@@ -49,6 +50,7 @@ impl Default for ReadFrame { | |
eof: false, | ||
is_readable: false, | ||
buffer: BytesMut::with_capacity(INITIAL_CAPACITY), | ||
has_errored: false, | ||
} | ||
} | ||
} | ||
|
@@ -72,6 +74,7 @@ impl From<BytesMut> for ReadFrame { | |
buffer, | ||
is_readable: size > 0, | ||
eof: false, | ||
has_errored: false, | ||
} | ||
} | ||
} | ||
|
@@ -126,30 +129,37 @@ where | |
// | ||
// The initial state is `reading`. | ||
// | ||
// | state | eof | is_readable | | ||
// |---------|-------|-------------| | ||
// | reading | false | false | | ||
// | framing | false | true | | ||
// | pausing | true | true | | ||
// | paused | true | false | | ||
// | ||
// `decode_eof` | ||
// returns `Some` read 0 bytes | ||
// │ │ │ │ | ||
// │ ▼ │ ▼ | ||
// ┌───────┐ `decode_eof` ┌──────┐ | ||
// ┌──read 0 bytes──▶│pausing│─returns `None`─▶│paused│──┐ | ||
// │ └───────┘ └──────┘ │ | ||
// pending read┐ │ ┌──────┐ │ ▲ │ | ||
// │ │ │ │ │ │ │ │ | ||
// │ ▼ │ │ `decode` returns `Some`│ pending read | ||
// │ ╔═══════╗ ┌───────┐◀─┘ │ | ||
// └──║reading║─read n>0 bytes─▶│framing│ │ | ||
// ╚═══════╝ └───────┘◀──────read n>0 bytes┘ | ||
// ▲ │ | ||
// │ │ | ||
// └─`decode` returns `None`─┘ | ||
// | state | eof | is_readable | has_errored | | ||
// |---------|-------|-------------|-------------| | ||
// | reading | false | false | false | | ||
// | framing | false | true | false | | ||
// | pausing | true | true | false | | ||
// | paused | true | false | false | | ||
// | errored | <any> | <any> | true | `decode_eof` returns Err | ||
// ┌───────────────────────────────────────┐ | ||
// `decode_eof` │ │ | ||
// returns Ok(`Some`) │ read 0 bytes │ | ||
// │ │ ┌───────┘ │ │ │ | ||
// │ ▼ │ │ ▼ │ | ||
// ┌───────────┐ `decode_eof` ┌──────┐ │ | ||
// ┌──read 0 bytes──▶│ pausing │─returns Ok(`None`)─▶│paused│───────┐ │ | ||
// │ └───────────┘ └──────┘ │ ▼ | ||
// pending read┐ │ ┌──────┐ │ ▲ │ ┌─────────┐ | ||
// │ │ │ │ │ │ │ │ │ errored │ | ||
// │ ▼ │ │ `decode` returns `Some` │ pending read └─────────┘ | ||
// │ ╔═══════╗ ┌───────────┐◀─┘ │ ▲ | ||
// └──║reading║─read n>0 bytes─▶│ framing │ │ │ | ||
// ╚═══════╝ └───────────┘◀──────read n>0 bytes┘ │ | ||
// ▲ │ │ │ | ||
// │ │ │ `decode` returns Err │ | ||
// └─`decode` returns `None`──┘ └────────────────────────────────────────────────┘ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This picture is kinda broken. I think you need to adjust some spaces to make stuff line up. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You might need to use a different character for the arrows. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The diagram is still pretty broken. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. that might indeed be the best way. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry for the late reply, I was traveling over the weekend. I'll push a new revision tonight. |
||
loop { | ||
// Return `None` if we have encountered an error from the underlying decoder | ||
// See: https://github.com/tokio-rs/tokio/issues/3976 | ||
if state.has_errored { | ||
return Poll::Ready(None); | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Currently the stream is not fused when you reach EOF normally. Maybe we should put it back into the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not quite sure what that means. If we move to the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, but do we want to keep returning none if they keep trying to read from it? See #3272 for context. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah I see what you mean now. Yeah, it makes sense to move it to the |
||
|
||
// Repeatedly call `decode` or `decode_eof` while the buffer is "readable", | ||
// i.e. it _might_ contain data consumable as a frame or closing frame. | ||
// Both signal that there is no such data by returning `None`. | ||
|
@@ -165,7 +175,10 @@ where | |
// pausing or framing | ||
if state.eof { | ||
// pausing | ||
let frame = pinned.codec.decode_eof(&mut state.buffer)?; | ||
let frame = pinned.codec.decode_eof(&mut state.buffer).map_err(|err| { | ||
state.has_errored = true; | ||
err | ||
})?; | ||
if frame.is_none() { | ||
state.is_readable = false; // prepare pausing -> paused | ||
} | ||
|
@@ -176,7 +189,10 @@ where | |
// framing | ||
trace!("attempting to decode a frame"); | ||
|
||
if let Some(frame) = pinned.codec.decode(&mut state.buffer)? { | ||
if let Some(frame) = pinned.codec.decode(&mut state.buffer).map_err(|op| { | ||
state.has_errored = true; | ||
op | ||
})? { | ||
trace!("frame decoded from buffer"); | ||
// implicit framing -> framing | ||
return Poll::Ready(Some(Ok(frame))); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are some rustfmt failures:
You can fix them by running