Skip to content

Commit

Permalink
move blocking error to web (#2660)
Browse files Browse the repository at this point in the history
  • Loading branch information
robjtede committed Feb 22, 2022
1 parent 1c1d647 commit ad38973
Show file tree
Hide file tree
Showing 11 changed files with 52 additions and 46 deletions.
5 changes: 2 additions & 3 deletions actix-files/src/chunked.rs
Expand Up @@ -81,7 +81,7 @@ async fn chunked_read_file_callback(
) -> Result<(File, Bytes), Error> {
use io::{Read as _, Seek as _};

let res = actix_web::rt::task::spawn_blocking(move || {
let res = actix_web::web::block(move || {
let mut buf = Vec::with_capacity(max_bytes);

file.seek(io::SeekFrom::Start(offset))?;
Expand All @@ -94,8 +94,7 @@ async fn chunked_read_file_callback(
Ok((file, Bytes::from(buf)))
}
})
.await
.map_err(|_| actix_web::error::BlockingError)??;
.await??;

Ok(res)
}
Expand Down
4 changes: 4 additions & 0 deletions actix-http/CHANGES.md
@@ -1,6 +1,10 @@
# Changes

## Unreleased - 2021-xx-xx
### Removed
- `error::BlockingError` [#2660]

[#2660]: https://github.com/actix/actix-web/pull/2660


## 3.0.0-rc.4 - 2022-02-22
Expand Down
16 changes: 13 additions & 3 deletions actix-http/src/encoding/decoder.rs
Expand Up @@ -19,7 +19,7 @@ use zstd::stream::write::Decoder as ZstdDecoder;

use crate::{
encoding::Writer,
error::{BlockingError, PayloadError},
error::PayloadError,
header::{ContentEncoding, HeaderMap, CONTENT_ENCODING},
};

Expand Down Expand Up @@ -47,14 +47,17 @@ where
ContentEncoding::Brotli => Some(ContentDecoder::Brotli(Box::new(
brotli::DecompressorWriter::new(Writer::new(), 8_096),
))),

#[cfg(feature = "compress-gzip")]
ContentEncoding::Deflate => Some(ContentDecoder::Deflate(Box::new(
ZlibDecoder::new(Writer::new()),
))),

#[cfg(feature = "compress-gzip")]
ContentEncoding::Gzip => Some(ContentDecoder::Gzip(Box::new(GzDecoder::new(
Writer::new(),
)))),

#[cfg(feature = "compress-zstd")]
ContentEncoding::Zstd => Some(ContentDecoder::Zstd(Box::new(
ZstdDecoder::new(Writer::new()).expect(
Expand Down Expand Up @@ -98,8 +101,12 @@ where

loop {
if let Some(ref mut fut) = this.fut {
let (chunk, decoder) =
ready!(Pin::new(fut).poll(cx)).map_err(|_| BlockingError)??;
let (chunk, decoder) = ready!(Pin::new(fut).poll(cx)).map_err(|_| {
PayloadError::Io(io::Error::new(
io::ErrorKind::Other,
"Blocking task was cancelled unexpectedly",
))
})??;

*this.decoder = Some(decoder);
this.fut.take();
Expand Down Expand Up @@ -159,10 +166,13 @@ where
enum ContentDecoder {
#[cfg(feature = "compress-gzip")]
Deflate(Box<ZlibDecoder<Writer>>),

#[cfg(feature = "compress-gzip")]
Gzip(Box<GzDecoder<Writer>>),

#[cfg(feature = "compress-brotli")]
Brotli(Box<brotli::DecompressorWriter<Writer>>),

// We need explicit 'static lifetime here because ZstdDecoder need lifetime
// argument, and we use `spawn_blocking` in `Decoder::poll_next` that require `FnOnce() -> R + Send + 'static`
#[cfg(feature = "compress-zstd")]
Expand Down
14 changes: 8 additions & 6 deletions actix-http/src/encoding/encoder.rs
Expand Up @@ -23,7 +23,6 @@ use zstd::stream::write::Encoder as ZstdEncoder;
use super::Writer;
use crate::{
body::{self, BodySize, MessageBody},
error::BlockingError,
header::{self, ContentEncoding, HeaderValue, CONTENT_ENCODING},
ResponseHead, StatusCode,
};
Expand Down Expand Up @@ -173,7 +172,12 @@ where

if let Some(ref mut fut) = this.fut {
let mut encoder = ready!(Pin::new(fut).poll(cx))
.map_err(|_| EncoderError::Blocking(BlockingError))?
.map_err(|_| {
EncoderError::Io(io::Error::new(
io::ErrorKind::Other,
"Blocking task was cancelled unexpectedly",
))
})?
.map_err(EncoderError::Io)?;

let chunk = encoder.take();
Expand Down Expand Up @@ -400,12 +404,11 @@ fn new_brotli_compressor() -> Box<brotli::CompressorWriter<Writer>> {
#[derive(Debug, Display)]
#[non_exhaustive]
pub enum EncoderError {
/// Wrapped body stream error.
#[display(fmt = "body")]
Body(Box<dyn StdError>),

#[display(fmt = "blocking")]
Blocking(BlockingError),

/// Generic I/O error.
#[display(fmt = "io")]
Io(io::Error),
}
Expand All @@ -414,7 +417,6 @@ impl StdError for EncoderError {
fn source(&self) -> Option<&(dyn StdError + 'static)> {
match self {
EncoderError::Body(err) => Some(&**err),
EncoderError::Blocking(err) => Some(err),
EncoderError::Io(err) => Some(err),
}
}
Expand Down
23 changes: 4 additions & 19 deletions actix-http/src/error.rs
Expand Up @@ -51,7 +51,7 @@ impl Error {
Self::new(Kind::SendResponse)
}

#[allow(unused)] // reserved for future use (TODO: remove allow when being used)
#[allow(unused)] // available for future use
pub(crate) fn new_io() -> Self {
Self::new(Kind::Io)
}
Expand Down Expand Up @@ -252,12 +252,6 @@ impl From<ParseError> for Response<BoxBody> {
}
}

/// A set of errors that can occur running blocking tasks in thread pool.
#[derive(Debug, Display, Error)]
#[display(fmt = "Blocking thread pool is gone")]
// TODO: non-exhaustive
pub struct BlockingError;

/// A set of errors that can occur during payload parsing.
#[derive(Debug, Display)]
#[non_exhaustive]
Expand Down Expand Up @@ -295,13 +289,13 @@ impl std::error::Error for PayloadError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
PayloadError::Incomplete(None) => None,
PayloadError::Incomplete(Some(err)) => Some(err as &dyn std::error::Error),
PayloadError::Incomplete(Some(err)) => Some(err),
PayloadError::EncodingCorrupted => None,
PayloadError::Overflow => None,
PayloadError::UnknownLength => None,
#[cfg(feature = "http2")]
PayloadError::Http2Payload(err) => Some(err as &dyn std::error::Error),
PayloadError::Io(err) => Some(err as &dyn std::error::Error),
PayloadError::Http2Payload(err) => Some(err),
PayloadError::Io(err) => Some(err),
}
}
}
Expand All @@ -325,15 +319,6 @@ impl From<io::Error> for PayloadError {
}
}

impl From<BlockingError> for PayloadError {
fn from(_: BlockingError) -> Self {
PayloadError::Io(io::Error::new(
io::ErrorKind::Other,
"Operation is canceled",
))
}
}

impl From<PayloadError> for Error {
fn from(err: PayloadError) -> Self {
Self::new_payload().with_cause(err)
Expand Down
3 changes: 2 additions & 1 deletion actix-http/tests/test_server.rs
Expand Up @@ -850,7 +850,8 @@ async fn not_modified_spec_h1() {
Some(&header::HeaderValue::from_static("4")),
);
// server does not prevent payload from being sent but clients may choose not to read it
// TODO: this is probably a bug, especially since CL header can differ in length from the body
// TODO: this is probably a bug in the client, especially since CL header can differ in length
// from the body
assert!(!srv.load_body(res).await.unwrap().is_empty());

// TODO: add stream response tests
Expand Down
1 change: 1 addition & 0 deletions actix-web/CHANGES.md
@@ -1,6 +1,7 @@
# Changes

## Unreleased - 2021-xx-xx
### Changed
- Rename `test::{simple_service => status_service}`. [#2659]

[#2659]: https://github.com/actix/actix-web/pull/2659
Expand Down
1 change: 0 additions & 1 deletion actix-web/src/error/error.rs
Expand Up @@ -47,7 +47,6 @@ impl fmt::Debug for Error {

impl StdError for Error {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
// TODO: populate if replacement for Box<dyn Error> is found
None
}
}
Expand Down
10 changes: 9 additions & 1 deletion actix-web/src/error/mod.rs
Expand Up @@ -6,7 +6,7 @@
//
// See <https://github.com/rust-lang/rust/issues/83375>
pub use actix_http::error::{
BlockingError, ContentTypeError, DispatchError, HttpError, ParseError, PayloadError,
ContentTypeError, DispatchError, HttpError, ParseError, PayloadError,
};

use derive_more::{Display, Error, From};
Expand All @@ -33,6 +33,14 @@ pub(crate) use macros::{downcast_dyn, downcast_get_type_id};
/// This type alias is generally used to avoid writing out `actix_http::Error` directly.
pub type Result<T, E = Error> = std::result::Result<T, E>;

/// An error representing a problem running a blocking task on a thread pool.
#[derive(Debug, Display, Error)]
#[display(fmt = "Blocking thread pool is shut down unexpectedly")]
#[non_exhaustive]
pub struct BlockingError;

impl ResponseError for crate::error::BlockingError {}

/// Errors which can occur when attempting to generate resource uri.
#[derive(Debug, PartialEq, Display, Error, From)]
#[non_exhaustive]
Expand Down
20 changes: 9 additions & 11 deletions actix-web/src/error/response_error.rs
Expand Up @@ -6,20 +6,22 @@ use std::{
io::{self, Write as _},
};

use actix_http::{
body::BoxBody,
header::{self, TryIntoHeaderValue},
Response, StatusCode,
};
use actix_http::Response;
use bytes::BytesMut;

use crate::{
body::BoxBody,
error::{downcast_dyn, downcast_get_type_id},
helpers, HttpResponse,
helpers,
http::{
header::{self, TryIntoHeaderValue},
StatusCode,
},
HttpResponse,
};

/// Errors that can generate responses.
// TODO: add std::error::Error bound when replacement for Box<dyn Error> is found
// TODO: flesh out documentation
pub trait ResponseError: fmt::Debug + fmt::Display {
/// Returns appropriate status code for error.
///
Expand Down Expand Up @@ -73,7 +75,6 @@ impl ResponseError for std::str::Utf8Error {

impl ResponseError for std::io::Error {
fn status_code(&self) -> StatusCode {
// TODO: decide if these errors should consider not found or permission errors
match self.kind() {
io::ErrorKind::NotFound => StatusCode::NOT_FOUND,
io::ErrorKind::PermissionDenied => StatusCode::FORBIDDEN,
Expand All @@ -86,7 +87,6 @@ impl ResponseError for actix_http::error::HttpError {}

impl ResponseError for actix_http::Error {
fn status_code(&self) -> StatusCode {
// TODO: map error kinds to status code better
StatusCode::INTERNAL_SERVER_ERROR
}

Expand All @@ -107,8 +107,6 @@ impl ResponseError for actix_http::error::ParseError {
}
}

impl ResponseError for actix_http::error::BlockingError {}

impl ResponseError for actix_http::error::PayloadError {
fn status_code(&self) -> StatusCode {
match *self {
Expand Down
1 change: 0 additions & 1 deletion actix-web/src/http/mod.rs
Expand Up @@ -2,5 +2,4 @@

pub mod header;

// TODO: figure out how best to expose http::Error vs actix_http::Error
pub use actix_http::{uri, ConnectionType, Error, KeepAlive, Method, StatusCode, Uri, Version};

0 comments on commit ad38973

Please sign in to comment.