Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix stuck connection when handler doesn't read payload #2624

Merged
merged 5 commits into from Feb 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 8 additions & 0 deletions actix-http/CHANGES.md
@@ -1,6 +1,14 @@
# Changes

## Unreleased - 2021-xx-xx
### Changed
- `error::DispatcherError` enum is now marked `#[non_exhaustive]`. [#2624]


### Fixed
- Issue where handlers that took payload but then dropped without reading it to EOF it would cause keep-alive connections to become stuck. [#2624]

[#2624]: https://github.com/actix/actix-web/pull/2624


## 3.0.0-rc.1 - 2022-01-31
Expand Down
5 changes: 5 additions & 0 deletions actix-http/src/error.rs
Expand Up @@ -340,6 +340,7 @@ impl From<PayloadError> for Error {

/// A set of errors that can occur during dispatching HTTP requests.
#[derive(Debug, Display, From)]
#[non_exhaustive]
pub enum DispatchError {
/// Service error.
#[display(fmt = "Service Error")]
Expand Down Expand Up @@ -373,6 +374,10 @@ pub enum DispatchError {
#[display(fmt = "Connection shutdown timeout")]
DisconnectTimeout,

/// Handler dropped payload before reading EOF.
#[display(fmt = "Handler dropped payload before reading EOF")]
HandlerDroppedPayload,

/// Internal error.
#[display(fmt = "Internal error")]
InternalError,
Expand Down
2 changes: 2 additions & 0 deletions actix-http/src/h1/codec.rs
Expand Up @@ -125,11 +125,13 @@ impl Decoder for Codec {
self.flags.set(Flags::HEAD, head.method == Method::HEAD);
self.version = head.version;
self.conn_type = head.connection_type();

if self.conn_type == ConnectionType::KeepAlive
&& !self.flags.contains(Flags::KEEP_ALIVE_ENABLED)
{
self.conn_type = ConnectionType::Close
}

match payload {
PayloadType::None => self.payload = None,
PayloadType::Payload(pl) => self.payload = Some(pl),
Expand Down
8 changes: 5 additions & 3 deletions actix-http/src/h1/decoder.rs
Expand Up @@ -209,15 +209,16 @@ impl MessageType for Request {

let (len, method, uri, ver, h_len) = {
// SAFETY:
// Create an uninitialized array of `MaybeUninit`. The `assume_init` is
// safe because the type we are claiming to have initialized here is a
// bunch of `MaybeUninit`s, which do not require initialization.
// Create an uninitialized array of `MaybeUninit`. The `assume_init` is safe because the
// type we are claiming to have initialized here is a bunch of `MaybeUninit`s, which
// do not require initialization.
let mut parsed = unsafe {
MaybeUninit::<[MaybeUninit<httparse::Header<'_>>; MAX_HEADERS]>::uninit()
.assume_init()
};

let mut req = httparse::Request::new(&mut []);

match req.parse_with_uninit_headers(src, &mut parsed)? {
httparse::Status::Complete(len) => {
let method = Method::from_bytes(req.method.unwrap().as_bytes())
Expand All @@ -232,6 +233,7 @@ impl MessageType for Request {

(len, method, uri, version, req.headers.len())
}

httparse::Status::Partial => {
return if src.len() >= MAX_BUFFER_SIZE {
trace!("MAX_BUFFER_SIZE unprocessed data reached, closing");
Expand Down
73 changes: 68 additions & 5 deletions actix-http/src/h1/dispatcher.rs
Expand Up @@ -21,7 +21,7 @@ use crate::{
config::ServiceConfig,
error::{DispatchError, ParseError, PayloadError},
service::HttpFlow,
Error, Extensions, OnConnectData, Request, Response, StatusCode,
ConnectionType, Error, Extensions, OnConnectData, Request, Response, StatusCode,
};

use super::{
Expand Down Expand Up @@ -151,7 +151,8 @@ pin_project! {
error: Option<DispatchError>,

#[pin]
state: State<S, B, X>,
pub(super) state: State<S, B, X>,
// when Some(_) dispatcher is in state of receiving request payload
payload: Option<PayloadSender>,
messages: VecDeque<DispatcherMessage>,

Expand All @@ -174,7 +175,7 @@ enum DispatcherMessage {

pin_project! {
#[project = StateProj]
enum State<S, B, X>
pub(super) enum State<S, B, X>
where
S: Service<Request>,
X: Service<Request, Response = Request>,
Expand All @@ -194,7 +195,7 @@ where
X: Service<Request, Response = Request>,
B: MessageBody,
{
fn is_none(&self) -> bool {
pub(super) fn is_none(&self) -> bool {
matches!(self, State::None)
}
}
Expand Down Expand Up @@ -686,12 +687,74 @@ where
let can_not_read = !self.can_read(cx);

// limit amount of non-processed requests
if pipeline_queue_full || can_not_read {
if pipeline_queue_full {
return Ok(false);
}

let mut this = self.as_mut().project();

if can_not_read {
log::debug!("cannot read request payload");

if let Some(sender) = &this.payload {
// ...maybe handler does not want to read any more payload...
if let PayloadStatus::Dropped = sender.need_read(cx) {
log::debug!("handler dropped payload early; attempt to clean connection");
// ...in which case poll request payload a few times
loop {
match this.codec.decode(this.read_buf)? {
Some(msg) => {
match msg {
// payload decoded did not yield EOF yet
Message::Chunk(Some(_)) => {
// if non-clean connection, next loop iter will detect empty
// read buffer and close connection
}

// connection is in clean state for next request
Message::Chunk(None) => {
log::debug!("connection successfully cleaned");

// reset dispatcher state
let _ = this.payload.take();
this.state.set(State::None);

// break out of payload decode loop
break;
}

// Either whole payload is read and loop is broken or more data
// was expected in which case connection is closed. In both
// situations dispatcher cannot get here.
Message::Item(_) => {
unreachable!("dispatcher is in payload receive state")
}
}
}

// not enough info to decide if connection is going to be clean or not
None => {
log::error!(
"handler did not read whole payload and dispatcher could not \
drain read buf; return 500 and close connection"
);

this.flags.insert(Flags::SHUTDOWN);
let mut res = Response::internal_server_error().drop_body();
res.head_mut().set_connection_type(ConnectionType::Close);
this.messages.push_back(DispatcherMessage::Error(res));
*this.error = Some(DispatchError::HandlerDroppedPayload);
return Ok(true);
}
}
}
}
} else {
// can_not_read and no request payload
return Ok(false);
}
}

let mut updated = false;

loop {
Expand Down