Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

solve framed integration with actix-http #179

Merged
merged 4 commits into from
Aug 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions actix-codec/CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
* Upgrade `tokio-util` to `0.3`.
* Improve `BytesCodec` `.encode()` performance
* Simplify `BytesCodec` `.decode()`
* Rename methods on `Framed` to better describe their use.
* Add method on `Framed` to get a pinned reference to the underlying I/O.
* Add method on `Framed` check emptiness of read buffer.

## [0.2.0] - 2019-12-10

Expand Down
134 changes: 66 additions & 68 deletions actix-codec/src/framed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ bitflags::bitflags! {

/// A unified `Stream` and `Sink` interface to an underlying I/O object, using
/// the `Encoder` and `Decoder` traits to encode and decode frames.
///
/// Raw I/O objects work with byte sequences, but higher-level code usually
/// wants to batch these into meaningful chunks, called "frames". This
/// method layers framing on top of an I/O object, by using the `Encoder`/`Decoder`
/// traits to handle encoding and decoding of message frames. Note that
/// the incoming and outgoing frame types may be distinct.
#[pin_project]
pub struct Framed<T, U> {
#[pin]
Expand All @@ -38,15 +44,6 @@ where
T: AsyncRead + AsyncWrite,
U: Decoder,
{
/// Provides a `Stream` and `Sink` interface for reading and writing to this
/// `Io` object, using `Decode` and `Encode` to read and write the raw data.
///
/// Raw I/O objects work with byte sequences, but higher-level code usually
/// wants to batch these into meaningful chunks, called "frames". This
/// method layers framing on top of an I/O object, by using the `Codec`
/// traits to handle encoding and decoding of messages frames. Note that
/// the incoming and outgoing frame types may be distinct.
///
/// This function returns a *single* object that is both `Stream` and
/// `Sink`; grouping this into a single object is often useful for layering
/// things like gzip or TLS, which require both read and write access to the
Expand All @@ -63,40 +60,13 @@ where
}

impl<T, U> Framed<T, U> {
/// Provides a `Stream` and `Sink` interface for reading and writing to this
/// `Io` object, using `Decode` and `Encode` to read and write the raw data.
///
/// Raw I/O objects work with byte sequences, but higher-level code usually
/// wants to batch these into meaningful chunks, called "frames". This
/// method layers framing on top of an I/O object, by using the `Codec`
/// traits to handle encoding and decoding of messages frames. Note that
/// the incoming and outgoing frame types may be distinct.
///
/// This function returns a *single* object that is both `Stream` and
/// `Sink`; grouping this into a single object is often useful for layering
/// things like gzip or TLS, which require both read and write access to the
/// underlying object.
///
/// This objects takes a stream and a readbuffer and a writebuffer. These
/// field can be obtained from an existing `Framed` with the
/// `into_parts` method.
pub fn from_parts(parts: FramedParts<T, U>) -> Framed<T, U> {
Framed {
io: parts.io,
codec: parts.codec,
flags: parts.flags,
write_buf: parts.write_buf,
read_buf: parts.read_buf,
}
}

/// Returns a reference to the underlying codec.
pub fn get_codec(&self) -> &U {
pub fn codec_ref(&self) -> &U {
&self.codec
}

/// Returns a mutable reference to the underlying codec.
pub fn get_codec_mut(&mut self) -> &mut U {
pub fn codec_mut(&mut self) -> &mut U {
&mut self.codec
}

Expand All @@ -106,20 +76,29 @@ impl<T, U> Framed<T, U> {
/// Note that care should be taken to not tamper with the underlying stream
/// of data coming in as it may corrupt the stream of frames otherwise
/// being worked with.
pub fn get_ref(&self) -> &T {
pub fn io_ref(&self) -> &T {
&self.io
}

/// Returns a mutable reference to the underlying I/O stream wrapped by
/// `Frame`.
/// Returns a mutable reference to the underlying I/O stream.
///
/// Note that care should be taken to not tamper with the underlying stream
/// of data coming in as it may corrupt the stream of frames otherwise
/// being worked with.
pub fn get_mut(&mut self) -> &mut T {
pub fn io_mut(&mut self) -> &mut T {
&mut self.io
}

/// Returns a `Pin` of a mutable reference to the underlying I/O stream.
pub fn io_pin(self: Pin<&mut Self>) -> Pin<&mut T> {
self.project().io
}

/// Check if read buffer is empty.
pub fn is_read_buf_empty(&self) -> bool {
self.read_buf.is_empty()
}

/// Check if write buffer is empty.
pub fn is_write_buf_empty(&self) -> bool {
self.write_buf.is_empty()
Expand All @@ -130,8 +109,15 @@ impl<T, U> Framed<T, U> {
self.write_buf.len() >= HW
}

/// Check if framed is able to write more data.
///
/// `Framed` object considers ready if there is free space in write buffer.
pub fn is_write_ready(&self) -> bool {
self.write_buf.len() < HW
}

/// Consume the `Frame`, returning `Frame` with different codec.
pub fn into_framed<U2, I2>(self, codec: U2) -> Framed<T, U2> {
pub fn replace_codec<U2, I2>(self, codec: U2) -> Framed<T, U2> {
Framed {
codec,
io: self.io,
Expand All @@ -142,7 +128,7 @@ impl<T, U> Framed<T, U> {
}

/// Consume the `Frame`, returning `Frame` with different io.
pub fn map_io<F, T2, I2>(self, f: F) -> Framed<T2, U>
pub fn into_map_io<F, T2>(self, f: F) -> Framed<T2, U>
where
F: Fn(T) -> T2,
{
Expand All @@ -156,7 +142,7 @@ impl<T, U> Framed<T, U> {
}

/// Consume the `Frame`, returning `Frame` with different codec.
pub fn map_codec<F, U2, I2>(self, f: F) -> Framed<T, U2>
pub fn into_map_codec<F, U2>(self, f: F) -> Framed<T, U2>
where
F: Fn(U) -> U2,
{
Expand All @@ -168,22 +154,6 @@ impl<T, U> Framed<T, U> {
write_buf: self.write_buf,
}
}

/// Consumes the `Frame`, returning its underlying I/O stream, the buffer
/// with unprocessed data, and the codec.
///
/// Note that care should be taken to not tamper with the underlying stream
/// of data coming in as it may corrupt the stream of frames otherwise
/// being worked with.
pub fn into_parts(self) -> FramedParts<T, U> {
FramedParts {
io: self.io,
codec: self.codec,
flags: self.flags,
read_buf: self.read_buf,
write_buf: self.write_buf,
}
}
}

impl<T, U> Framed<T, U> {
Expand All @@ -203,13 +173,6 @@ impl<T, U> Framed<T, U> {
Ok(())
}

/// Check if framed is able to write more data.
///
/// `Framed` object considers ready if there is free space in write buffer.
pub fn is_write_ready(&self) -> bool {
self.write_buf.len() < HW
}

/// Try to read underlying I/O stream and decode item.
pub fn next_item(
mut self: Pin<&mut Self>,
Expand Down Expand Up @@ -376,6 +339,41 @@ where
}
}

impl<T, U> Framed<T, U> {
/// This function returns a *single* object that is both `Stream` and
/// `Sink`; grouping this into a single object is often useful for layering
/// things like gzip or TLS, which require both read and write access to the
/// underlying object.
///
/// These objects take a stream, a read buffer and a write buffer. These
/// fields can be obtained from an existing `Framed` with the `into_parts` method.
pub fn from_parts(parts: FramedParts<T, U>) -> Framed<T, U> {
Framed {
io: parts.io,
codec: parts.codec,
flags: parts.flags,
write_buf: parts.write_buf,
read_buf: parts.read_buf,
}
}

/// Consumes the `Frame`, returning its underlying I/O stream, the buffer
/// with unprocessed data, and the codec.
///
/// Note that care should be taken to not tamper with the underlying stream
/// of data coming in as it may corrupt the stream of frames otherwise
/// being worked with.
pub fn into_parts(self) -> FramedParts<T, U> {
FramedParts {
io: self.io,
codec: self.codec,
flags: self.flags,
read_buf: self.read_buf,
write_buf: self.write_buf,
}
}
}

/// `FramedParts` contains an export of the data of a Framed transport.
/// It can be used to construct a new `Framed` with a different codec.
/// It contains all current buffers and the inner transport.
Expand Down
2 changes: 2 additions & 0 deletions actix-codec/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
//! [`AsyncWrite`]: AsyncWrite
//! [`Sink`]: futures_sink::Sink
//! [`Stream`]: futures_core::Stream

#![deny(rust_2018_idioms)]
#![warn(missing_docs)]

mod bcodec;
mod framed;
Expand Down
1 change: 1 addition & 0 deletions actix-utils/CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

* Upgrade `tokio-util` to `0.3`.
* Remove unsound custom Cell and use `std::cell::RefCell` instead, as well as `actix-service`.
* Rename method to correctly spelled `LocalWaker::is_registered`.

## [1.0.6] - 2020-01-08

Expand Down
2 changes: 1 addition & 1 deletion actix-utils/src/counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::task::LocalWaker;
#[derive(Clone)]
/// Simple counter with ability to notify task on reaching specific number
///
/// Counter could be cloned, total ncount is shared across all clones.
/// Counter could be cloned, total n-count is shared across all clones.
pub struct Counter(Rc<CounterInner>);

struct CounterInner {
Expand Down
2 changes: 2 additions & 0 deletions actix-utils/src/framed.rs → actix-utils/src/dispatcher.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
//! Framed dispatcher service and related utilities

#![allow(type_alias_bounds)]

use std::pin::Pin;
use std::task::{Context, Poll};
use std::{fmt, mem};
Expand Down
2 changes: 1 addition & 1 deletion actix-utils/src/inflight.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ mod tests {
}

#[actix_rt::test]
async fn test_newtransform() {
async fn test_new_transform() {
let wait_time = Duration::from_millis(50);

let srv = apply(InFlight::new(1), fn_factory(|| ok(SleepService(wait_time))));
Expand Down
3 changes: 2 additions & 1 deletion actix-utils/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
//! Actix utils - various helper services

#![deny(rust_2018_idioms)]
#![allow(clippy::type_complexity)]

pub mod condition;
pub mod counter;
pub mod dispatcher;
pub mod either;
pub mod framed;
pub mod inflight;
pub mod keepalive;
pub mod mpsc;
Expand Down
2 changes: 1 addition & 1 deletion actix-utils/src/oneshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ pub struct PReceiver<T> {
inner: Rc<RefCell<Slab<PoolInner<T>>>>,
}

// The oneshots do not ever project Pin to the inner T
// The one-shots do not ever project Pin to the inner T
impl<T> Unpin for PReceiver<T> {}
impl<T> Unpin for PSender<T> {}

Expand Down
2 changes: 1 addition & 1 deletion actix-utils/src/order.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ mod tests {
}

#[actix_rt::test]
async fn test_inorder() {
async fn test_in_order() {
let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
let (tx3, rx3) = oneshot::channel();
Expand Down
2 changes: 1 addition & 1 deletion actix-utils/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ impl LocalWaker {

#[inline]
/// Check if waker has been registered.
pub fn is_registed(&self) -> bool {
pub fn is_registered(&self) -> bool {
unsafe { (*self.waker.get()).is_some() }
}

Expand Down
4 changes: 2 additions & 2 deletions actix-utils/src/time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ mod tests {
///
/// Expected Behavior: Two back-to-back calls of `LowResTimeService::now()` return the same value.
#[actix_rt::test]
async fn lowres_time_service_time_does_not_immediately_change() {
async fn low_res_time_service_time_does_not_immediately_change() {
let resolution = Duration::from_millis(50);
let time_service = LowResTimeService::with(resolution);
assert_eq!(time_service.now(), time_service.now());
Expand Down Expand Up @@ -210,7 +210,7 @@ mod tests {
/// Expected Behavior: Two calls of `LowResTimeService::now()` made in subsequent resolution interval return different values
/// and second value is greater than the first one at least by a resolution interval.
#[actix_rt::test]
async fn lowres_time_service_time_updates_after_resolution_interval() {
async fn low_res_time_service_time_updates_after_resolution_interval() {
let resolution = Duration::from_millis(100);
let wait_time = Duration::from_millis(300);
let time_service = LowResTimeService::with(resolution);
Expand Down
2 changes: 1 addition & 1 deletion actix-utils/src/timeout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ mod tests {
}

#[actix_rt::test]
async fn test_timeout_newservice() {
async fn test_timeout_new_service() {
let resolution = Duration::from_millis(100);
let wait_time = Duration::from_millis(500);

Expand Down