Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

integrate with updated actix-{codec, utils} #1634

Merged
merged 1 commit into from Aug 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 8 additions & 3 deletions CHANGES.md
@@ -1,14 +1,19 @@
# Changes

## Unreleased
## Unreleased - 2020-xx-xx
### Added
* `middleware::NormalizePath` now has configurable behaviour for either always having a trailing slash,
or as the new addition, always trimming trailing slashes.
* `middleware::NormalizePath` now has configurable behaviour for either always having a trailing
slash, or as the new addition, always trimming trailing slashes.

### Changed
* Update actix-codec and actix-utils dependencies.


## 3.0.0-beta.3 - 2020-08-17
### Changed
* Update `rustls` to 0.18


## 3.0.0-beta.2 - 2020-08-17
### Changed
* `PayloadConfig` is now also considered in `Bytes` and `String` extractors when set
Expand Down
6 changes: 3 additions & 3 deletions Cargo.toml
Expand Up @@ -65,9 +65,9 @@ name = "test_server"
required-features = ["compress"]

[dependencies]
actix-codec = "0.2.0"
actix-service = "1.0.2"
actix-utils = "1.0.6"
actix-codec = "0.3.0"
actix-service = "1.0.6"
actix-utils = "2.0.0"
actix-router = "0.2.4"
actix-rt = "1.1.1"
actix-server = "1.0.0"
Expand Down
3 changes: 2 additions & 1 deletion actix-http/CHANGES.md
@@ -1,7 +1,8 @@
# Changes

## Unreleased

### Changed
* Update actix-codec and actix-utils dependencies.

## [2.0.0-beta.3] - 2020-08-14

Expand Down
4 changes: 2 additions & 2 deletions actix-http/Cargo.toml
Expand Up @@ -41,9 +41,9 @@ actors = ["actix"]

[dependencies]
actix-service = "1.0.5"
actix-codec = "0.2.0"
actix-codec = "0.3.0"
actix-connect = "2.0.0-alpha.4"
actix-utils = "1.0.6"
actix-utils = "2.0.0"
actix-rt = "1.0.0"
actix-threadpool = "0.3.1"
actix-tls = { version = "2.0.0-alpha.2", optional = true }
Expand Down
12 changes: 8 additions & 4 deletions actix-http/src/client/connection.rs
Expand Up @@ -46,10 +46,10 @@ pub trait Connection {

pub(crate) trait ConnectionLifetime: AsyncRead + AsyncWrite + 'static {
/// Close connection
fn close(&mut self);
fn close(self: Pin<&mut Self>);

/// Release connection to the connection pool
fn release(&mut self);
fn release(self: Pin<&mut Self>);
}

#[doc(hidden)]
Expand Down Expand Up @@ -195,11 +195,15 @@ where
match self {
EitherConnection::A(con) => con
.open_tunnel(head)
.map(|res| res.map(|(head, framed)| (head, framed.map_io(EitherIo::A))))
.map(|res| {
res.map(|(head, framed)| (head, framed.into_map_io(EitherIo::A)))
})
.boxed_local(),
EitherConnection::B(con) => con
.open_tunnel(head)
.map(|res| res.map(|(head, framed)| (head, framed.map_io(EitherIo::B))))
.map(|res| {
res.map(|(head, framed)| (head, framed.into_map_io(EitherIo::B)))
})
.boxed_local(),
}
}
Expand Down
65 changes: 35 additions & 30 deletions actix-http/src/client/h1proto.rs
Expand Up @@ -67,32 +67,32 @@ where
};

// create Framed and send request
let mut framed = Framed::new(io, h1::ClientCodec::default());
framed.send((head, body.size()).into()).await?;
let mut framed_inner = Framed::new(io, h1::ClientCodec::default());
framed_inner.send((head, body.size()).into()).await?;

// send request body
match body.size() {
BodySize::None | BodySize::Empty | BodySize::Sized(0) => (),
_ => send_body(body, &mut framed).await?,
_ => send_body(body, Pin::new(&mut framed_inner)).await?,
};

// read response and init read body
let res = framed.into_future().await;
let res = Pin::new(&mut framed_inner).into_future().await;
let (head, framed) = if let (Some(result), framed) = res {
let item = result.map_err(SendRequestError::from)?;
(item, framed)
} else {
return Err(SendRequestError::from(ConnectError::Disconnected));
};

match framed.get_codec().message_type() {
match framed.codec_ref().message_type() {
h1::MessageType::None => {
let force_close = !framed.get_codec().keepalive();
let force_close = !framed.codec_ref().keepalive();
release_connection(framed, force_close);
Ok((head, Payload::None))
}
_ => {
let pl: PayloadStream = PlStream::new(framed).boxed_local();
let pl: PayloadStream = PlStream::new(framed_inner).boxed_local();
Ok((head, pl.into()))
}
}
Expand All @@ -119,35 +119,36 @@ where
}

/// send request body to the peer
pub(crate) async fn send_body<I, B>(
pub(crate) async fn send_body<T, B>(
body: B,
framed: &mut Framed<I, h1::ClientCodec>,
mut framed: Pin<&mut Framed<T, h1::ClientCodec>>,
) -> Result<(), SendRequestError>
where
I: ConnectionLifetime,
T: ConnectionLifetime + Unpin,
B: MessageBody,
{
let mut eof = false;
pin_mut!(body);

let mut eof = false;
while !eof {
while !eof && !framed.is_write_buf_full() {
while !eof && !framed.as_ref().is_write_buf_full() {
match poll_fn(|cx| body.as_mut().poll_next(cx)).await {
Some(result) => {
framed.write(h1::Message::Chunk(Some(result?)))?;
framed.as_mut().write(h1::Message::Chunk(Some(result?)))?;
}
None => {
eof = true;
framed.write(h1::Message::Chunk(None))?;
framed.as_mut().write(h1::Message::Chunk(None))?;
}
}
}

if !framed.is_write_buf_empty() {
poll_fn(|cx| match framed.flush(cx) {
if !framed.as_ref().is_write_buf_empty() {
poll_fn(|cx| match framed.as_mut().flush(cx) {
Poll::Ready(Ok(_)) => Poll::Ready(Ok(())),
Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
Poll::Pending => {
if !framed.is_write_buf_full() {
if !framed.as_ref().is_write_buf_full() {
Poll::Ready(Ok(()))
} else {
Poll::Pending
Expand All @@ -158,13 +159,14 @@ where
}
}

SinkExt::flush(framed).await?;
SinkExt::flush(Pin::into_inner(framed)).await?;
Ok(())
}

#[doc(hidden)]
/// HTTP client connection
pub struct H1Connection<T> {
/// T should be `Unpin`
io: Option<T>,
created: time::Instant,
pool: Option<Acquired<T>>,
Expand All @@ -175,7 +177,7 @@ where
T: AsyncRead + AsyncWrite + Unpin + 'static,
{
/// Close connection
fn close(&mut self) {
fn close(mut self: Pin<&mut Self>) {
if let Some(mut pool) = self.pool.take() {
if let Some(io) = self.io.take() {
pool.close(IoConnection::new(
Expand All @@ -188,7 +190,7 @@ where
}

/// Release this connection to the connection pool
fn release(&mut self) {
fn release(mut self: Pin<&mut Self>) {
if let Some(mut pool) = self.pool.take() {
if let Some(io) = self.io.take() {
pool.release(IoConnection::new(
Expand Down Expand Up @@ -242,14 +244,18 @@ impl<T: AsyncRead + AsyncWrite + Unpin + 'static> AsyncWrite for H1Connection<T>
}
}

#[pin_project::pin_project]
pub(crate) struct PlStream<Io> {
#[pin]
framed: Option<Framed<Io, h1::ClientPayloadCodec>>,
}

impl<Io: ConnectionLifetime> PlStream<Io> {
fn new(framed: Framed<Io, h1::ClientCodec>) -> Self {
let framed = framed.into_map_codec(|codec| codec.into_payload_codec());

PlStream {
framed: Some(framed.map_codec(|codec| codec.into_payload_codec())),
framed: Some(framed),
}
}
}
Expand All @@ -261,16 +267,16 @@ impl<Io: ConnectionLifetime> Stream for PlStream<Io> {
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
let mut this = self.project();

match this.framed.as_mut().unwrap().next_item(cx)? {
match this.framed.as_mut().as_pin_mut().unwrap().next_item(cx)? {
Poll::Pending => Poll::Pending,
Poll::Ready(Some(chunk)) => {
if let Some(chunk) = chunk {
Poll::Ready(Some(Ok(chunk)))
} else {
let framed = this.framed.take().unwrap();
let force_close = !framed.get_codec().keepalive();
let framed = this.framed.as_mut().as_pin_mut().unwrap();
let force_close = !framed.codec_ref().keepalive();
release_connection(framed, force_close);
Poll::Ready(None)
}
Expand All @@ -280,14 +286,13 @@ impl<Io: ConnectionLifetime> Stream for PlStream<Io> {
}
}

fn release_connection<T, U>(framed: Framed<T, U>, force_close: bool)
fn release_connection<T, U>(framed: Pin<&mut Framed<T, U>>, force_close: bool)
where
T: ConnectionLifetime,
{
let mut parts = framed.into_parts();
if !force_close && parts.read_buf.is_empty() && parts.write_buf.is_empty() {
parts.io.release()
if !force_close && framed.is_read_buf_empty() && framed.is_write_buf_empty() {
framed.io_pin().release()
} else {
parts.io.close()
framed.io_pin().close()
}
}
7 changes: 4 additions & 3 deletions actix-http/src/error.rs
@@ -1,4 +1,5 @@
//! Error and Result module

use std::cell::RefCell;
use std::io::Write;
use std::str::Utf8Error;
Expand All @@ -7,7 +8,7 @@ use std::{fmt, io, result};

use actix_codec::{Decoder, Encoder};
pub use actix_threadpool::BlockingError;
use actix_utils::framed::DispatcherError as FramedDispatcherError;
use actix_utils::dispatcher::DispatcherError as FramedDispatcherError;
use actix_utils::timeout::TimeoutError;
use bytes::BytesMut;
use derive_more::{Display, From};
Expand Down Expand Up @@ -452,10 +453,10 @@ impl ResponseError for ContentTypeError {
}
}

impl<E, U: Encoder + Decoder> ResponseError for FramedDispatcherError<E, U>
impl<E, U: Encoder<I> + Decoder, I> ResponseError for FramedDispatcherError<E, U, I>
where
E: fmt::Debug + fmt::Display,
<U as Encoder>::Error: fmt::Debug,
<U as Encoder<I>>::Error: fmt::Debug,
<U as Decoder>::Error: fmt::Debug,
{
}
Expand Down
5 changes: 2 additions & 3 deletions actix-http/src/h1/client.rs
Expand Up @@ -173,13 +173,12 @@ impl Decoder for ClientPayloadCodec {
}
}

impl Encoder for ClientCodec {
type Item = Message<(RequestHeadType, BodySize)>;
impl Encoder<Message<(RequestHeadType, BodySize)>> for ClientCodec {
type Error = io::Error;

fn encode(
&mut self,
item: Self::Item,
item: Message<(RequestHeadType, BodySize)>,
dst: &mut BytesMut,
) -> Result<(), Self::Error> {
match item {
Expand Down
5 changes: 2 additions & 3 deletions actix-http/src/h1/codec.rs
Expand Up @@ -144,13 +144,12 @@ impl Decoder for Codec {
}
}

impl Encoder for Codec {
type Item = Message<(Response<()>, BodySize)>;
impl Encoder<Message<(Response<()>, BodySize)>> for Codec {
type Error = io::Error;

fn encode(
&mut self,
item: Self::Item,
item: Message<(Response<()>, BodySize)>,
dst: &mut BytesMut,
) -> Result<(), Self::Error> {
match item {
Expand Down
16 changes: 10 additions & 6 deletions actix-http/src/h1/service.rs
Expand Up @@ -548,10 +548,12 @@ where
}

#[doc(hidden)]
#[pin_project::pin_project]
pub struct OneRequestServiceResponse<T>
where
T: AsyncRead + AsyncWrite + Unpin,
{
#[pin]
framed: Option<Framed<T, Codec>>,
}

Expand All @@ -562,16 +564,18 @@ where
type Output = Result<(Request, Framed<T, Codec>), ParseError>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.framed.as_mut().unwrap().next_item(cx) {
Poll::Ready(Some(Ok(req))) => match req {
let this = self.as_mut().project();

match ready!(this.framed.as_pin_mut().unwrap().next_item(cx)) {
Some(Ok(req)) => match req {
Message::Item(req) => {
Poll::Ready(Ok((req, self.framed.take().unwrap())))
let mut this = self.as_mut().project();
Poll::Ready(Ok((req, this.framed.take().unwrap())))
}
Message::Chunk(_) => unreachable!("Something is wrong"),
},
Poll::Ready(Some(Err(err))) => Poll::Ready(Err(err)),
Poll::Ready(None) => Poll::Ready(Err(ParseError::Incomplete)),
Poll::Pending => Poll::Pending,
Some(Err(err)) => Poll::Ready(Err(err)),
None => Poll::Ready(Err(ParseError::Incomplete)),
}
}
}