diff --git a/CHANGES.md b/CHANGES.md index 0a7b26d7dc5..0d7311748c4 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,14 +1,19 @@ # Changes -## Unreleased +## Unreleased - 2020-xx-xx ### Added -* `middleware::NormalizePath` now has configurable behaviour for either always having a trailing slash, - or as the new addition, always trimming trailing slashes. +* `middleware::NormalizePath` now has configurable behaviour for either always having a trailing + slash, or as the new addition, always trimming trailing slashes. + +### Changed +* Update actix-codec and actix-utils dependencies. + ## 3.0.0-beta.3 - 2020-08-17 ### Changed * Update `rustls` to 0.18 + ## 3.0.0-beta.2 - 2020-08-17 ### Changed * `PayloadConfig` is now also considered in `Bytes` and `String` extractors when set diff --git a/Cargo.toml b/Cargo.toml index 52d19c5721b..dbdab59376a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -65,9 +65,9 @@ name = "test_server" required-features = ["compress"] [dependencies] -actix-codec = "0.2.0" -actix-service = "1.0.2" -actix-utils = "1.0.6" +actix-codec = "0.3.0" +actix-service = "1.0.6" +actix-utils = "2.0.0" actix-router = "0.2.4" actix-rt = "1.1.1" actix-server = "1.0.0" diff --git a/actix-http/CHANGES.md b/actix-http/CHANGES.md index e223f10c3d0..d8d674fb16c 100644 --- a/actix-http/CHANGES.md +++ b/actix-http/CHANGES.md @@ -1,7 +1,8 @@ # Changes ## Unreleased - +### Changed +* Update actix-codec and actix-utils dependencies. ## [2.0.0-beta.3] - 2020-08-14 diff --git a/actix-http/Cargo.toml b/actix-http/Cargo.toml index 5b65e83a903..750d1e0af90 100644 --- a/actix-http/Cargo.toml +++ b/actix-http/Cargo.toml @@ -41,9 +41,9 @@ actors = ["actix"] [dependencies] actix-service = "1.0.5" -actix-codec = "0.2.0" +actix-codec = "0.3.0" actix-connect = "2.0.0-alpha.4" -actix-utils = "1.0.6" +actix-utils = "2.0.0" actix-rt = "1.0.0" actix-threadpool = "0.3.1" actix-tls = { version = "2.0.0-alpha.2", optional = true } diff --git a/actix-http/src/client/connection.rs b/actix-http/src/client/connection.rs index eecf2ee6fc8..ec86dabb0d4 100644 --- a/actix-http/src/client/connection.rs +++ b/actix-http/src/client/connection.rs @@ -46,10 +46,10 @@ pub trait Connection { pub(crate) trait ConnectionLifetime: AsyncRead + AsyncWrite + 'static { /// Close connection - fn close(&mut self); + fn close(self: Pin<&mut Self>); /// Release connection to the connection pool - fn release(&mut self); + fn release(self: Pin<&mut Self>); } #[doc(hidden)] @@ -195,11 +195,15 @@ where match self { EitherConnection::A(con) => con .open_tunnel(head) - .map(|res| res.map(|(head, framed)| (head, framed.map_io(EitherIo::A)))) + .map(|res| { + res.map(|(head, framed)| (head, framed.into_map_io(EitherIo::A))) + }) .boxed_local(), EitherConnection::B(con) => con .open_tunnel(head) - .map(|res| res.map(|(head, framed)| (head, framed.map_io(EitherIo::B)))) + .map(|res| { + res.map(|(head, framed)| (head, framed.into_map_io(EitherIo::B))) + }) .boxed_local(), } } diff --git a/actix-http/src/client/h1proto.rs b/actix-http/src/client/h1proto.rs index 51e853b3db6..06cc05404f1 100644 --- a/actix-http/src/client/h1proto.rs +++ b/actix-http/src/client/h1proto.rs @@ -67,17 +67,17 @@ where }; // create Framed and send request - let mut framed = Framed::new(io, h1::ClientCodec::default()); - framed.send((head, body.size()).into()).await?; + let mut framed_inner = Framed::new(io, h1::ClientCodec::default()); + framed_inner.send((head, body.size()).into()).await?; // send request body match body.size() { BodySize::None | BodySize::Empty | BodySize::Sized(0) => (), - _ => send_body(body, &mut framed).await?, + _ => send_body(body, Pin::new(&mut framed_inner)).await?, }; // read response and init read body - let res = framed.into_future().await; + let res = Pin::new(&mut framed_inner).into_future().await; let (head, framed) = if let (Some(result), framed) = res { let item = result.map_err(SendRequestError::from)?; (item, framed) @@ -85,14 +85,14 @@ where return Err(SendRequestError::from(ConnectError::Disconnected)); }; - match framed.get_codec().message_type() { + match framed.codec_ref().message_type() { h1::MessageType::None => { - let force_close = !framed.get_codec().keepalive(); + let force_close = !framed.codec_ref().keepalive(); release_connection(framed, force_close); Ok((head, Payload::None)) } _ => { - let pl: PayloadStream = PlStream::new(framed).boxed_local(); + let pl: PayloadStream = PlStream::new(framed_inner).boxed_local(); Ok((head, pl.into())) } } @@ -119,35 +119,36 @@ where } /// send request body to the peer -pub(crate) async fn send_body( +pub(crate) async fn send_body( body: B, - framed: &mut Framed, + mut framed: Pin<&mut Framed>, ) -> Result<(), SendRequestError> where - I: ConnectionLifetime, + T: ConnectionLifetime + Unpin, B: MessageBody, { - let mut eof = false; pin_mut!(body); + + let mut eof = false; while !eof { - while !eof && !framed.is_write_buf_full() { + while !eof && !framed.as_ref().is_write_buf_full() { match poll_fn(|cx| body.as_mut().poll_next(cx)).await { Some(result) => { - framed.write(h1::Message::Chunk(Some(result?)))?; + framed.as_mut().write(h1::Message::Chunk(Some(result?)))?; } None => { eof = true; - framed.write(h1::Message::Chunk(None))?; + framed.as_mut().write(h1::Message::Chunk(None))?; } } } - if !framed.is_write_buf_empty() { - poll_fn(|cx| match framed.flush(cx) { + if !framed.as_ref().is_write_buf_empty() { + poll_fn(|cx| match framed.as_mut().flush(cx) { Poll::Ready(Ok(_)) => Poll::Ready(Ok(())), Poll::Ready(Err(err)) => Poll::Ready(Err(err)), Poll::Pending => { - if !framed.is_write_buf_full() { + if !framed.as_ref().is_write_buf_full() { Poll::Ready(Ok(())) } else { Poll::Pending @@ -158,13 +159,14 @@ where } } - SinkExt::flush(framed).await?; + SinkExt::flush(Pin::into_inner(framed)).await?; Ok(()) } #[doc(hidden)] /// HTTP client connection pub struct H1Connection { + /// T should be `Unpin` io: Option, created: time::Instant, pool: Option>, @@ -175,7 +177,7 @@ where T: AsyncRead + AsyncWrite + Unpin + 'static, { /// Close connection - fn close(&mut self) { + fn close(mut self: Pin<&mut Self>) { if let Some(mut pool) = self.pool.take() { if let Some(io) = self.io.take() { pool.close(IoConnection::new( @@ -188,7 +190,7 @@ where } /// Release this connection to the connection pool - fn release(&mut self) { + fn release(mut self: Pin<&mut Self>) { if let Some(mut pool) = self.pool.take() { if let Some(io) = self.io.take() { pool.release(IoConnection::new( @@ -242,14 +244,18 @@ impl AsyncWrite for H1Connection } } +#[pin_project::pin_project] pub(crate) struct PlStream { + #[pin] framed: Option>, } impl PlStream { fn new(framed: Framed) -> Self { + let framed = framed.into_map_codec(|codec| codec.into_payload_codec()); + PlStream { - framed: Some(framed.map_codec(|codec| codec.into_payload_codec())), + framed: Some(framed), } } } @@ -261,16 +267,16 @@ impl Stream for PlStream { self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { - let this = self.get_mut(); + let mut this = self.project(); - match this.framed.as_mut().unwrap().next_item(cx)? { + match this.framed.as_mut().as_pin_mut().unwrap().next_item(cx)? { Poll::Pending => Poll::Pending, Poll::Ready(Some(chunk)) => { if let Some(chunk) = chunk { Poll::Ready(Some(Ok(chunk))) } else { - let framed = this.framed.take().unwrap(); - let force_close = !framed.get_codec().keepalive(); + let framed = this.framed.as_mut().as_pin_mut().unwrap(); + let force_close = !framed.codec_ref().keepalive(); release_connection(framed, force_close); Poll::Ready(None) } @@ -280,14 +286,13 @@ impl Stream for PlStream { } } -fn release_connection(framed: Framed, force_close: bool) +fn release_connection(framed: Pin<&mut Framed>, force_close: bool) where T: ConnectionLifetime, { - let mut parts = framed.into_parts(); - if !force_close && parts.read_buf.is_empty() && parts.write_buf.is_empty() { - parts.io.release() + if !force_close && framed.is_read_buf_empty() && framed.is_write_buf_empty() { + framed.io_pin().release() } else { - parts.io.close() + framed.io_pin().close() } } diff --git a/actix-http/src/error.rs b/actix-http/src/error.rs index d0754ce8828..e93c077afdf 100644 --- a/actix-http/src/error.rs +++ b/actix-http/src/error.rs @@ -1,4 +1,5 @@ //! Error and Result module + use std::cell::RefCell; use std::io::Write; use std::str::Utf8Error; @@ -7,7 +8,7 @@ use std::{fmt, io, result}; use actix_codec::{Decoder, Encoder}; pub use actix_threadpool::BlockingError; -use actix_utils::framed::DispatcherError as FramedDispatcherError; +use actix_utils::dispatcher::DispatcherError as FramedDispatcherError; use actix_utils::timeout::TimeoutError; use bytes::BytesMut; use derive_more::{Display, From}; @@ -452,10 +453,10 @@ impl ResponseError for ContentTypeError { } } -impl ResponseError for FramedDispatcherError +impl + Decoder, I> ResponseError for FramedDispatcherError where E: fmt::Debug + fmt::Display, - ::Error: fmt::Debug, + >::Error: fmt::Debug, ::Error: fmt::Debug, { } diff --git a/actix-http/src/h1/client.rs b/actix-http/src/h1/client.rs index bcfc18cde03..2e0103409c7 100644 --- a/actix-http/src/h1/client.rs +++ b/actix-http/src/h1/client.rs @@ -173,13 +173,12 @@ impl Decoder for ClientPayloadCodec { } } -impl Encoder for ClientCodec { - type Item = Message<(RequestHeadType, BodySize)>; +impl Encoder> for ClientCodec { type Error = io::Error; fn encode( &mut self, - item: Self::Item, + item: Message<(RequestHeadType, BodySize)>, dst: &mut BytesMut, ) -> Result<(), Self::Error> { match item { diff --git a/actix-http/src/h1/codec.rs b/actix-http/src/h1/codec.rs index de2af9ee711..036f166705f 100644 --- a/actix-http/src/h1/codec.rs +++ b/actix-http/src/h1/codec.rs @@ -144,13 +144,12 @@ impl Decoder for Codec { } } -impl Encoder for Codec { - type Item = Message<(Response<()>, BodySize)>; +impl Encoder, BodySize)>> for Codec { type Error = io::Error; fn encode( &mut self, - item: Self::Item, + item: Message<(Response<()>, BodySize)>, dst: &mut BytesMut, ) -> Result<(), Self::Error> { match item { diff --git a/actix-http/src/h1/service.rs b/actix-http/src/h1/service.rs index 4d1a1dc1bad..339a0f53874 100644 --- a/actix-http/src/h1/service.rs +++ b/actix-http/src/h1/service.rs @@ -548,10 +548,12 @@ where } #[doc(hidden)] +#[pin_project::pin_project] pub struct OneRequestServiceResponse where T: AsyncRead + AsyncWrite + Unpin, { + #[pin] framed: Option>, } @@ -562,16 +564,18 @@ where type Output = Result<(Request, Framed), ParseError>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match self.framed.as_mut().unwrap().next_item(cx) { - Poll::Ready(Some(Ok(req))) => match req { + let this = self.as_mut().project(); + + match ready!(this.framed.as_pin_mut().unwrap().next_item(cx)) { + Some(Ok(req)) => match req { Message::Item(req) => { - Poll::Ready(Ok((req, self.framed.take().unwrap()))) + let mut this = self.as_mut().project(); + Poll::Ready(Ok((req, this.framed.take().unwrap()))) } Message::Chunk(_) => unreachable!("Something is wrong"), }, - Poll::Ready(Some(Err(err))) => Poll::Ready(Err(err)), - Poll::Ready(None) => Poll::Ready(Err(ParseError::Incomplete)), - Poll::Pending => Poll::Pending, + Some(Err(err)) => Poll::Ready(Err(err)), + None => Poll::Ready(Err(ParseError::Incomplete)), } } } diff --git a/actix-http/src/h1/utils.rs b/actix-http/src/h1/utils.rs index c44925c7a8b..9e9c57137da 100644 --- a/actix-http/src/h1/utils.rs +++ b/actix-http/src/h1/utils.rs @@ -9,12 +9,13 @@ use crate::error::Error; use crate::h1::{Codec, Message}; use crate::response::Response; -/// Send http/1 response +/// Send HTTP/1 response #[pin_project::pin_project] pub struct SendResponse { res: Option, BodySize)>>, #[pin] body: Option>, + #[pin] framed: Option>, } @@ -35,23 +36,30 @@ where impl Future for SendResponse where - T: AsyncRead + AsyncWrite, + T: AsyncRead + AsyncWrite + Unpin, B: MessageBody + Unpin, { type Output = Result, Error>; // TODO: rethink if we need loops in polls - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut this = self.project(); + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.as_mut().project(); let mut body_done = this.body.is_none(); loop { let mut body_ready = !body_done; - let framed = this.framed.as_mut().unwrap(); // send body if this.res.is_none() && body_ready { - while body_ready && !body_done && !framed.is_write_buf_full() { + while body_ready + && !body_done + && !this + .framed + .as_ref() + .as_pin_ref() + .unwrap() + .is_write_buf_full() + { match this.body.as_mut().as_pin_mut().unwrap().poll_next(cx)? { Poll::Ready(item) => { // body is done when item is None @@ -59,6 +67,7 @@ where if body_done { let _ = this.body.take(); } + let framed = this.framed.as_mut().as_pin_mut().unwrap(); framed.write(Message::Chunk(item))?; } Poll::Pending => body_ready = false, @@ -66,6 +75,8 @@ where } } + let framed = this.framed.as_mut().as_pin_mut().unwrap(); + // flush write buffer if !framed.is_write_buf_empty() { match framed.flush(cx)? { @@ -96,6 +107,9 @@ where break; } } - Poll::Ready(Ok(this.framed.take().unwrap())) + + let framed = this.framed.take().unwrap(); + + Poll::Ready(Ok(framed)) } } diff --git a/actix-http/src/ws/codec.rs b/actix-http/src/ws/codec.rs index 733976a78fd..7c9628b1ae6 100644 --- a/actix-http/src/ws/codec.rs +++ b/actix-http/src/ws/codec.rs @@ -91,8 +91,7 @@ impl Codec { } } -impl Encoder for Codec { - type Item = Message; +impl Encoder for Codec { type Error = ProtocolError; fn encode(&mut self, item: Message, dst: &mut BytesMut) -> Result<(), Self::Error> { diff --git a/actix-http/src/ws/dispatcher.rs b/actix-http/src/ws/dispatcher.rs index 7a6b11b1867..b114217a0fa 100644 --- a/actix-http/src/ws/dispatcher.rs +++ b/actix-http/src/ws/dispatcher.rs @@ -4,16 +4,18 @@ use std::task::{Context, Poll}; use actix_codec::{AsyncRead, AsyncWrite, Framed}; use actix_service::{IntoService, Service}; -use actix_utils::framed; +use actix_utils::dispatcher::{Dispatcher as InnerDispatcher, DispatcherError}; use super::{Codec, Frame, Message}; +#[pin_project::pin_project] pub struct Dispatcher where S: Service + 'static, T: AsyncRead + AsyncWrite, { - inner: framed::Dispatcher, + #[pin] + inner: InnerDispatcher, } impl Dispatcher @@ -25,13 +27,13 @@ where { pub fn new>(io: T, service: F) -> Self { Dispatcher { - inner: framed::Dispatcher::new(Framed::new(io, Codec::new()), service), + inner: InnerDispatcher::new(Framed::new(io, Codec::new()), service), } } pub fn with>(framed: Framed, service: F) -> Self { Dispatcher { - inner: framed::Dispatcher::new(framed, service), + inner: InnerDispatcher::new(framed, service), } } } @@ -43,9 +45,9 @@ where S::Future: 'static, S::Error: 'static, { - type Output = Result<(), framed::DispatcherError>; + type Output = Result<(), DispatcherError>; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - Pin::new(&mut self.inner).poll(cx) + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.project().inner.poll(cx) } } diff --git a/actix-http/tests/test_ws.rs b/actix-http/tests/test_ws.rs index ff9def85be4..5d86605f428 100644 --- a/actix-http/tests/test_ws.rs +++ b/actix-http/tests/test_ws.rs @@ -8,7 +8,7 @@ use actix_codec::{AsyncRead, AsyncWrite, Framed}; use actix_http::{body, h1, ws, Error, HttpService, Request, Response}; use actix_http_test::test_server; use actix_service::{fn_factory, Service}; -use actix_utils::framed::Dispatcher; +use actix_utils::dispatcher::Dispatcher; use bytes::Bytes; use futures_util::future; use futures_util::task::{Context, Poll}; @@ -59,7 +59,7 @@ where .await .unwrap(); - Dispatcher::new(framed.into_framed(ws::Codec::new()), service) + Dispatcher::new(framed.replace_codec(ws::Codec::new()), service) .await .map_err(|_| panic!()) }; diff --git a/actix-multipart/Cargo.toml b/actix-multipart/Cargo.toml index 04c3415e098..58f7113e5d9 100644 --- a/actix-multipart/Cargo.toml +++ b/actix-multipart/Cargo.toml @@ -18,7 +18,7 @@ path = "src/lib.rs" [dependencies] actix-web = { version = "3.0.0-beta.3", default-features = false } actix-service = "1.0.1" -actix-utils = "1.0.3" +actix-utils = "2.0.0" bytes = "0.5.3" derive_more = "0.99.2" httparse = "1.3" diff --git a/actix-web-actors/Cargo.toml b/actix-web-actors/Cargo.toml index a5f682cd5b8..b33acc22229 100644 --- a/actix-web-actors/Cargo.toml +++ b/actix-web-actors/Cargo.toml @@ -19,7 +19,7 @@ path = "src/lib.rs" actix = "0.10.0-alpha.2" actix-web = { version = "3.0.0-beta.3", default-features = false } actix-http = "2.0.0-beta.3" -actix-codec = "0.2.0" +actix-codec = "0.3.0" bytes = "0.5.2" futures-channel = { version = "0.3.5", default-features = false } futures-core = { version = "0.3.5", default-features = false } diff --git a/awc/CHANGES.md b/awc/CHANGES.md index f4b31aef676..8d1b5885678 100644 --- a/awc/CHANGES.md +++ b/awc/CHANGES.md @@ -1,5 +1,10 @@ # Changes +## Unreleased - 2020-xx-xx +### Changed +* Update actix-codec dependency. + + ## 2.0.0-beta.3 - 2020-08-17 ### Changed * Update `rustls` to 0.18 diff --git a/awc/Cargo.toml b/awc/Cargo.toml index ea4450bb27a..ff0afaa1c5d 100644 --- a/awc/Cargo.toml +++ b/awc/Cargo.toml @@ -37,7 +37,7 @@ rustls = ["rust-tls", "actix-http/rustls"] compress = ["actix-http/compress"] [dependencies] -actix-codec = "0.2.0" +actix-codec = "0.3.0" actix-service = "1.0.1" actix-http = "2.0.0-beta.3" actix-rt = "1.0.0" @@ -61,7 +61,7 @@ actix-connect = { version = "2.0.0-alpha.4", features = ["openssl"] } actix-web = { version = "3.0.0-beta.2", features = ["openssl"] } actix-http = { version = "2.0.0-beta.3", features = ["openssl"] } actix-http-test = { version = "2.0.0-alpha.1", features = ["openssl"] } -actix-utils = "1.0.3" +actix-utils = "2.0.0" actix-server = "1.0.0" actix-tls = { version = "2.0.0-alpha.2", features = ["openssl", "rustls"] } brotli2 = "0.3.2" diff --git a/awc/src/connect.rs b/awc/src/connect.rs index 618d653f5f2..7fbe1543aec 100644 --- a/awc/src/connect.rs +++ b/awc/src/connect.rs @@ -152,7 +152,7 @@ where let (head, framed) = connection.open_tunnel(RequestHeadType::from(head)).await?; - let framed = framed.map_io(|io| BoxedSocket(Box::new(Socket(io)))); + let framed = framed.into_map_io(|io| BoxedSocket(Box::new(Socket(io)))); Ok((head, framed)) }) } @@ -186,7 +186,7 @@ where .open_tunnel(RequestHeadType::Rc(head, extra_headers)) .await?; - let framed = framed.map_io(|io| BoxedSocket(Box::new(Socket(io)))); + let framed = framed.into_map_io(|io| BoxedSocket(Box::new(Socket(io)))); Ok((head, framed)) }) } diff --git a/awc/src/ws.rs b/awc/src/ws.rs index 6ad660c4112..8b01e571650 100644 --- a/awc/src/ws.rs +++ b/awc/src/ws.rs @@ -366,7 +366,7 @@ impl WebsocketsRequest { // response and ws framed Ok(( ClientResponse::new(head, Payload::None), - framed.map_codec(|_| { + framed.into_map_codec(|_| { if server_mode { ws::Codec::new().max_size(max_size) } else { diff --git a/awc/tests/test_ws.rs b/awc/tests/test_ws.rs index d3f66814f60..1c10686688a 100644 --- a/awc/tests/test_ws.rs +++ b/awc/tests/test_ws.rs @@ -32,7 +32,7 @@ async fn test_simple() { .await?; // start websocket service - let framed = framed.into_framed(ws::Codec::new()); + let framed = framed.replace_codec(ws::Codec::new()); ws::Dispatcher::with(framed, ws_service).await } }) diff --git a/test-server/CHANGES.md b/test-server/CHANGES.md index 079cad74aa1..71b906b9f4c 100644 --- a/test-server/CHANGES.md +++ b/test-server/CHANGES.md @@ -1,5 +1,8 @@ # Changes +## Unreleased - 2020-xx-xx +* Update actix-codec and actix-utils dependencies. + ## [2.0.0-alpha.1] - 2020-05-23 * Update the `time` dependency to 0.2.7 diff --git a/test-server/Cargo.toml b/test-server/Cargo.toml index f3df6d1f6af..13f27ab59bc 100644 --- a/test-server/Cargo.toml +++ b/test-server/Cargo.toml @@ -30,9 +30,9 @@ openssl = ["open-ssl", "awc/openssl"] [dependencies] actix-service = "1.0.1" -actix-codec = "0.2.0" +actix-codec = "0.3.0" actix-connect = "2.0.0-alpha.4" -actix-utils = "1.0.3" +actix-utils = "2.0.0" actix-rt = "1.0.0" actix-server = "1.0.0" actix-testing = "1.0.0"