From 1a13e5d02c3192d10669e0fdc681868aa8c120a2 Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Tue, 18 Aug 2020 17:57:28 +0100 Subject: [PATCH] update actix-codec/utils dependency --- Cargo.toml | 6 ++- actix-http/src/client/connection.rs | 12 ++++-- actix-http/src/client/h1proto.rs | 65 ++++++++++++++++------------- actix-http/src/error.rs | 7 ++-- actix-http/src/h1/client.rs | 5 +-- actix-http/src/h1/codec.rs | 5 +-- actix-http/src/h1/service.rs | 16 ++++--- actix-http/src/h1/utils.rs | 30 +++++++++---- actix-http/src/ws/codec.rs | 3 +- actix-http/src/ws/dispatcher.rs | 16 +++---- awc/src/connect.rs | 4 +- awc/src/ws.rs | 2 +- 12 files changed, 102 insertions(+), 69 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 52d19c5721b..e1033ecd027 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -66,7 +66,7 @@ required-features = ["compress"] [dependencies] actix-codec = "0.2.0" -actix-service = "1.0.2" +actix-service = "1.0.6" actix-utils = "1.0.6" actix-router = "0.2.4" actix-rt = "1.1.1" @@ -123,6 +123,10 @@ actix-web-codegen = { path = "actix-web-codegen" } actix-files = { path = "actix-files" } actix-multipart = { path = "actix-multipart" } awc = { path = "awc" } +actix-utils = { git = "https://github.com/actix/actix-net", branch = "fix/framed" } +actix-codec = { git = "https://github.com/actix/actix-net", branch = "fix/framed" } +# actix-codec = { path = "../actix-net/actix-codec" } +# actix-utils = { path = "../actix-net/actix-utils" } [[example]] name = "client" diff --git a/actix-http/src/client/connection.rs b/actix-http/src/client/connection.rs index eecf2ee6fc8..ec86dabb0d4 100644 --- a/actix-http/src/client/connection.rs +++ b/actix-http/src/client/connection.rs @@ -46,10 +46,10 @@ pub trait Connection { pub(crate) trait ConnectionLifetime: AsyncRead + AsyncWrite + 'static { /// Close connection - fn close(&mut self); + fn close(self: Pin<&mut Self>); /// Release connection to the connection pool - fn release(&mut self); + fn release(self: Pin<&mut Self>); } #[doc(hidden)] @@ -195,11 +195,15 @@ where match self { EitherConnection::A(con) => con .open_tunnel(head) - .map(|res| res.map(|(head, framed)| (head, framed.map_io(EitherIo::A)))) + .map(|res| { + res.map(|(head, framed)| (head, framed.into_map_io(EitherIo::A))) + }) .boxed_local(), EitherConnection::B(con) => con .open_tunnel(head) - .map(|res| res.map(|(head, framed)| (head, framed.map_io(EitherIo::B)))) + .map(|res| { + res.map(|(head, framed)| (head, framed.into_map_io(EitherIo::B))) + }) .boxed_local(), } } diff --git a/actix-http/src/client/h1proto.rs b/actix-http/src/client/h1proto.rs index 51e853b3db6..06cc05404f1 100644 --- a/actix-http/src/client/h1proto.rs +++ b/actix-http/src/client/h1proto.rs @@ -67,17 +67,17 @@ where }; // create Framed and send request - let mut framed = Framed::new(io, h1::ClientCodec::default()); - framed.send((head, body.size()).into()).await?; + let mut framed_inner = Framed::new(io, h1::ClientCodec::default()); + framed_inner.send((head, body.size()).into()).await?; // send request body match body.size() { BodySize::None | BodySize::Empty | BodySize::Sized(0) => (), - _ => send_body(body, &mut framed).await?, + _ => send_body(body, Pin::new(&mut framed_inner)).await?, }; // read response and init read body - let res = framed.into_future().await; + let res = Pin::new(&mut framed_inner).into_future().await; let (head, framed) = if let (Some(result), framed) = res { let item = result.map_err(SendRequestError::from)?; (item, framed) @@ -85,14 +85,14 @@ where return Err(SendRequestError::from(ConnectError::Disconnected)); }; - match framed.get_codec().message_type() { + match framed.codec_ref().message_type() { h1::MessageType::None => { - let force_close = !framed.get_codec().keepalive(); + let force_close = !framed.codec_ref().keepalive(); release_connection(framed, force_close); Ok((head, Payload::None)) } _ => { - let pl: PayloadStream = PlStream::new(framed).boxed_local(); + let pl: PayloadStream = PlStream::new(framed_inner).boxed_local(); Ok((head, pl.into())) } } @@ -119,35 +119,36 @@ where } /// send request body to the peer -pub(crate) async fn send_body( +pub(crate) async fn send_body( body: B, - framed: &mut Framed, + mut framed: Pin<&mut Framed>, ) -> Result<(), SendRequestError> where - I: ConnectionLifetime, + T: ConnectionLifetime + Unpin, B: MessageBody, { - let mut eof = false; pin_mut!(body); + + let mut eof = false; while !eof { - while !eof && !framed.is_write_buf_full() { + while !eof && !framed.as_ref().is_write_buf_full() { match poll_fn(|cx| body.as_mut().poll_next(cx)).await { Some(result) => { - framed.write(h1::Message::Chunk(Some(result?)))?; + framed.as_mut().write(h1::Message::Chunk(Some(result?)))?; } None => { eof = true; - framed.write(h1::Message::Chunk(None))?; + framed.as_mut().write(h1::Message::Chunk(None))?; } } } - if !framed.is_write_buf_empty() { - poll_fn(|cx| match framed.flush(cx) { + if !framed.as_ref().is_write_buf_empty() { + poll_fn(|cx| match framed.as_mut().flush(cx) { Poll::Ready(Ok(_)) => Poll::Ready(Ok(())), Poll::Ready(Err(err)) => Poll::Ready(Err(err)), Poll::Pending => { - if !framed.is_write_buf_full() { + if !framed.as_ref().is_write_buf_full() { Poll::Ready(Ok(())) } else { Poll::Pending @@ -158,13 +159,14 @@ where } } - SinkExt::flush(framed).await?; + SinkExt::flush(Pin::into_inner(framed)).await?; Ok(()) } #[doc(hidden)] /// HTTP client connection pub struct H1Connection { + /// T should be `Unpin` io: Option, created: time::Instant, pool: Option>, @@ -175,7 +177,7 @@ where T: AsyncRead + AsyncWrite + Unpin + 'static, { /// Close connection - fn close(&mut self) { + fn close(mut self: Pin<&mut Self>) { if let Some(mut pool) = self.pool.take() { if let Some(io) = self.io.take() { pool.close(IoConnection::new( @@ -188,7 +190,7 @@ where } /// Release this connection to the connection pool - fn release(&mut self) { + fn release(mut self: Pin<&mut Self>) { if let Some(mut pool) = self.pool.take() { if let Some(io) = self.io.take() { pool.release(IoConnection::new( @@ -242,14 +244,18 @@ impl AsyncWrite for H1Connection } } +#[pin_project::pin_project] pub(crate) struct PlStream { + #[pin] framed: Option>, } impl PlStream { fn new(framed: Framed) -> Self { + let framed = framed.into_map_codec(|codec| codec.into_payload_codec()); + PlStream { - framed: Some(framed.map_codec(|codec| codec.into_payload_codec())), + framed: Some(framed), } } } @@ -261,16 +267,16 @@ impl Stream for PlStream { self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { - let this = self.get_mut(); + let mut this = self.project(); - match this.framed.as_mut().unwrap().next_item(cx)? { + match this.framed.as_mut().as_pin_mut().unwrap().next_item(cx)? { Poll::Pending => Poll::Pending, Poll::Ready(Some(chunk)) => { if let Some(chunk) = chunk { Poll::Ready(Some(Ok(chunk))) } else { - let framed = this.framed.take().unwrap(); - let force_close = !framed.get_codec().keepalive(); + let framed = this.framed.as_mut().as_pin_mut().unwrap(); + let force_close = !framed.codec_ref().keepalive(); release_connection(framed, force_close); Poll::Ready(None) } @@ -280,14 +286,13 @@ impl Stream for PlStream { } } -fn release_connection(framed: Framed, force_close: bool) +fn release_connection(framed: Pin<&mut Framed>, force_close: bool) where T: ConnectionLifetime, { - let mut parts = framed.into_parts(); - if !force_close && parts.read_buf.is_empty() && parts.write_buf.is_empty() { - parts.io.release() + if !force_close && framed.is_read_buf_empty() && framed.is_write_buf_empty() { + framed.io_pin().release() } else { - parts.io.close() + framed.io_pin().close() } } diff --git a/actix-http/src/error.rs b/actix-http/src/error.rs index d0754ce8828..e93c077afdf 100644 --- a/actix-http/src/error.rs +++ b/actix-http/src/error.rs @@ -1,4 +1,5 @@ //! Error and Result module + use std::cell::RefCell; use std::io::Write; use std::str::Utf8Error; @@ -7,7 +8,7 @@ use std::{fmt, io, result}; use actix_codec::{Decoder, Encoder}; pub use actix_threadpool::BlockingError; -use actix_utils::framed::DispatcherError as FramedDispatcherError; +use actix_utils::dispatcher::DispatcherError as FramedDispatcherError; use actix_utils::timeout::TimeoutError; use bytes::BytesMut; use derive_more::{Display, From}; @@ -452,10 +453,10 @@ impl ResponseError for ContentTypeError { } } -impl ResponseError for FramedDispatcherError +impl + Decoder, I> ResponseError for FramedDispatcherError where E: fmt::Debug + fmt::Display, - ::Error: fmt::Debug, + >::Error: fmt::Debug, ::Error: fmt::Debug, { } diff --git a/actix-http/src/h1/client.rs b/actix-http/src/h1/client.rs index bcfc18cde03..2e0103409c7 100644 --- a/actix-http/src/h1/client.rs +++ b/actix-http/src/h1/client.rs @@ -173,13 +173,12 @@ impl Decoder for ClientPayloadCodec { } } -impl Encoder for ClientCodec { - type Item = Message<(RequestHeadType, BodySize)>; +impl Encoder> for ClientCodec { type Error = io::Error; fn encode( &mut self, - item: Self::Item, + item: Message<(RequestHeadType, BodySize)>, dst: &mut BytesMut, ) -> Result<(), Self::Error> { match item { diff --git a/actix-http/src/h1/codec.rs b/actix-http/src/h1/codec.rs index de2af9ee711..036f166705f 100644 --- a/actix-http/src/h1/codec.rs +++ b/actix-http/src/h1/codec.rs @@ -144,13 +144,12 @@ impl Decoder for Codec { } } -impl Encoder for Codec { - type Item = Message<(Response<()>, BodySize)>; +impl Encoder, BodySize)>> for Codec { type Error = io::Error; fn encode( &mut self, - item: Self::Item, + item: Message<(Response<()>, BodySize)>, dst: &mut BytesMut, ) -> Result<(), Self::Error> { match item { diff --git a/actix-http/src/h1/service.rs b/actix-http/src/h1/service.rs index 4d1a1dc1bad..339a0f53874 100644 --- a/actix-http/src/h1/service.rs +++ b/actix-http/src/h1/service.rs @@ -548,10 +548,12 @@ where } #[doc(hidden)] +#[pin_project::pin_project] pub struct OneRequestServiceResponse where T: AsyncRead + AsyncWrite + Unpin, { + #[pin] framed: Option>, } @@ -562,16 +564,18 @@ where type Output = Result<(Request, Framed), ParseError>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match self.framed.as_mut().unwrap().next_item(cx) { - Poll::Ready(Some(Ok(req))) => match req { + let this = self.as_mut().project(); + + match ready!(this.framed.as_pin_mut().unwrap().next_item(cx)) { + Some(Ok(req)) => match req { Message::Item(req) => { - Poll::Ready(Ok((req, self.framed.take().unwrap()))) + let mut this = self.as_mut().project(); + Poll::Ready(Ok((req, this.framed.take().unwrap()))) } Message::Chunk(_) => unreachable!("Something is wrong"), }, - Poll::Ready(Some(Err(err))) => Poll::Ready(Err(err)), - Poll::Ready(None) => Poll::Ready(Err(ParseError::Incomplete)), - Poll::Pending => Poll::Pending, + Some(Err(err)) => Poll::Ready(Err(err)), + None => Poll::Ready(Err(ParseError::Incomplete)), } } } diff --git a/actix-http/src/h1/utils.rs b/actix-http/src/h1/utils.rs index c44925c7a8b..0aef73fabc1 100644 --- a/actix-http/src/h1/utils.rs +++ b/actix-http/src/h1/utils.rs @@ -9,12 +9,13 @@ use crate::error::Error; use crate::h1::{Codec, Message}; use crate::response::Response; -/// Send http/1 response +/// Send HTTP/1 response #[pin_project::pin_project] pub struct SendResponse { res: Option, BodySize)>>, #[pin] body: Option>, + #[pin] framed: Option>, } @@ -40,18 +41,24 @@ where { type Output = Result, Error>; - // TODO: rethink if we need loops in polls - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut this = self.project(); + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.as_mut().project(); let mut body_done = this.body.is_none(); loop { let mut body_ready = !body_done; - let framed = this.framed.as_mut().unwrap(); // send body if this.res.is_none() && body_ready { - while body_ready && !body_done && !framed.is_write_buf_full() { + while body_ready + && !body_done + && !this + .framed + .as_ref() + .as_pin_ref() + .unwrap() + .is_write_buf_full() + { match this.body.as_mut().as_pin_mut().unwrap().poll_next(cx)? { Poll::Ready(item) => { // body is done when item is None @@ -59,6 +66,7 @@ where if body_done { let _ = this.body.take(); } + let framed = this.framed.as_mut().as_pin_mut().unwrap(); framed.write(Message::Chunk(item))?; } Poll::Pending => body_ready = false, @@ -66,6 +74,8 @@ where } } + let framed = this.framed.as_mut().as_pin_mut().unwrap(); + // flush write buffer if !framed.is_write_buf_empty() { match framed.flush(cx)? { @@ -96,6 +106,12 @@ where break; } } - Poll::Ready(Ok(this.framed.take().unwrap())) + + // TODO: refactor to remove need for unsafe + let framed = unsafe { Pin::get_unchecked_mut(this.framed.as_mut()) } + .take() + .unwrap(); + + Poll::Ready(Ok(framed)) } } diff --git a/actix-http/src/ws/codec.rs b/actix-http/src/ws/codec.rs index 733976a78fd..7c9628b1ae6 100644 --- a/actix-http/src/ws/codec.rs +++ b/actix-http/src/ws/codec.rs @@ -91,8 +91,7 @@ impl Codec { } } -impl Encoder for Codec { - type Item = Message; +impl Encoder for Codec { type Error = ProtocolError; fn encode(&mut self, item: Message, dst: &mut BytesMut) -> Result<(), Self::Error> { diff --git a/actix-http/src/ws/dispatcher.rs b/actix-http/src/ws/dispatcher.rs index 7a6b11b1867..b114217a0fa 100644 --- a/actix-http/src/ws/dispatcher.rs +++ b/actix-http/src/ws/dispatcher.rs @@ -4,16 +4,18 @@ use std::task::{Context, Poll}; use actix_codec::{AsyncRead, AsyncWrite, Framed}; use actix_service::{IntoService, Service}; -use actix_utils::framed; +use actix_utils::dispatcher::{Dispatcher as InnerDispatcher, DispatcherError}; use super::{Codec, Frame, Message}; +#[pin_project::pin_project] pub struct Dispatcher where S: Service + 'static, T: AsyncRead + AsyncWrite, { - inner: framed::Dispatcher, + #[pin] + inner: InnerDispatcher, } impl Dispatcher @@ -25,13 +27,13 @@ where { pub fn new>(io: T, service: F) -> Self { Dispatcher { - inner: framed::Dispatcher::new(Framed::new(io, Codec::new()), service), + inner: InnerDispatcher::new(Framed::new(io, Codec::new()), service), } } pub fn with>(framed: Framed, service: F) -> Self { Dispatcher { - inner: framed::Dispatcher::new(framed, service), + inner: InnerDispatcher::new(framed, service), } } } @@ -43,9 +45,9 @@ where S::Future: 'static, S::Error: 'static, { - type Output = Result<(), framed::DispatcherError>; + type Output = Result<(), DispatcherError>; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - Pin::new(&mut self.inner).poll(cx) + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.project().inner.poll(cx) } } diff --git a/awc/src/connect.rs b/awc/src/connect.rs index 618d653f5f2..7fbe1543aec 100644 --- a/awc/src/connect.rs +++ b/awc/src/connect.rs @@ -152,7 +152,7 @@ where let (head, framed) = connection.open_tunnel(RequestHeadType::from(head)).await?; - let framed = framed.map_io(|io| BoxedSocket(Box::new(Socket(io)))); + let framed = framed.into_map_io(|io| BoxedSocket(Box::new(Socket(io)))); Ok((head, framed)) }) } @@ -186,7 +186,7 @@ where .open_tunnel(RequestHeadType::Rc(head, extra_headers)) .await?; - let framed = framed.map_io(|io| BoxedSocket(Box::new(Socket(io)))); + let framed = framed.into_map_io(|io| BoxedSocket(Box::new(Socket(io)))); Ok((head, framed)) }) } diff --git a/awc/src/ws.rs b/awc/src/ws.rs index 6ad660c4112..8b01e571650 100644 --- a/awc/src/ws.rs +++ b/awc/src/ws.rs @@ -366,7 +366,7 @@ impl WebsocketsRequest { // response and ws framed Ok(( ClientResponse::new(head, Payload::None), - framed.map_codec(|_| { + framed.into_map_codec(|_| { if server_mode { ws::Codec::new().max_size(max_size) } else {