Skip to content

Commit

Permalink
fixup! Properly propagate substream state to caller
Browse files Browse the repository at this point in the history
  • Loading branch information
thomaseizinger committed Oct 8, 2022
1 parent 745b6c5 commit 5a9f29e
Showing 1 changed file with 12 additions and 27 deletions.
39 changes: 12 additions & 27 deletions muxers/mplex/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -798,14 +798,6 @@ where
id
}

/// Checks whether a substream is open for reading.
fn can_read(&self, id: &LocalStreamId) -> bool {
matches!(
self.substreams.get(id),
Some(SubstreamState::Open { .. }) | Some(SubstreamState::SendClosed { .. })
)
}

/// Sends pending frames, without flushing.
fn send_pending_frames(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
while let Some(frame) = self.pending_frames.pop_back() {
Expand Down Expand Up @@ -1266,33 +1258,26 @@ mod tests {
}
}

// Drain the buffer by reading from the stream.
for _ in 0..cfg.max_buffer_len + 1 {
match m.poll_read_stream(cx, id) {
Poll::Ready(Ok(Some(bytes))) => {
assert_eq!(bytes, data);
}
x => panic!("Unexpected: {:?}", x),
}
}

// Read from the stream after the buffer has been drained,
// expecting either EOF or further data, depending on
// the `MaxBufferBehaviour`.
// Try to read from the stream:
// - Resets are prioritized over data frames so we receive a reset right away.
// - If our behaviour is "block", we should be able to read all the data.
match cfg.max_buffer_behaviour {
MaxBufferBehaviour::ResetStream => {
// Expect to read EOF
match m.poll_read_stream(cx, id) {
Poll::Ready(Ok(None)) => {}
Poll::Ready(Err(e)) if e.kind() == ErrorKind::ConnectionReset => {}
poll => panic!("Unexpected: {:?}", poll),
}
}
MaxBufferBehaviour::Block => {
// Expect to be able to continue reading.
match m.poll_read_stream(cx, id) {
Poll::Ready(Ok(Some(bytes))) => assert_eq!(bytes, data),
Poll::Pending => assert_eq!(overflow.get(), 1),
poll => panic!("Unexpected: {:?}", poll),
for _ in 0..cfg.max_buffer_len + 2 {
match m.poll_read_stream(cx, id) {
Poll::Ready(Ok(Some(bytes))) => {
assert_eq!(bytes, data);
}
Poll::Pending => assert_eq!(overflow.get(), 1),
poll => panic!("Unexpected: {:?}", poll),
}
}
}
}
Expand Down

0 comments on commit 5a9f29e

Please sign in to comment.