Skip to content

Commit

Permalink
solve framed integration
Browse files Browse the repository at this point in the history
  • Loading branch information
robjtede committed Aug 16, 2020
1 parent 5d28be9 commit b07ee29
Show file tree
Hide file tree
Showing 11 changed files with 79 additions and 78 deletions.
134 changes: 66 additions & 68 deletions actix-codec/src/framed.rs
Expand Up @@ -23,6 +23,12 @@ bitflags::bitflags! {

/// A unified `Stream` and `Sink` interface to an underlying I/O object, using
/// the `Encoder` and `Decoder` traits to encode and decode frames.
///
/// Raw I/O objects work with byte sequences, but higher-level code usually
/// wants to batch these into meaningful chunks, called "frames". This
/// method layers framing on top of an I/O object, by using the `Encoder`/`Decoder`
/// traits to handle encoding and decoding of message frames. Note that
/// the incoming and outgoing frame types may be distinct.
#[pin_project]
pub struct Framed<T, U> {
#[pin]
Expand All @@ -38,15 +44,6 @@ where
T: AsyncRead + AsyncWrite,
U: Decoder,
{
/// Provides a `Stream` and `Sink` interface for reading and writing to this
/// `Io` object, using `Decode` and `Encode` to read and write the raw data.
///
/// Raw I/O objects work with byte sequences, but higher-level code usually
/// wants to batch these into meaningful chunks, called "frames". This
/// method layers framing on top of an I/O object, by using the `Codec`
/// traits to handle encoding and decoding of messages frames. Note that
/// the incoming and outgoing frame types may be distinct.
///
/// This function returns a *single* object that is both `Stream` and
/// `Sink`; grouping this into a single object is often useful for layering
/// things like gzip or TLS, which require both read and write access to the
Expand All @@ -63,40 +60,13 @@ where
}

impl<T, U> Framed<T, U> {
/// Provides a `Stream` and `Sink` interface for reading and writing to this
/// `Io` object, using `Decode` and `Encode` to read and write the raw data.
///
/// Raw I/O objects work with byte sequences, but higher-level code usually
/// wants to batch these into meaningful chunks, called "frames". This
/// method layers framing on top of an I/O object, by using the `Codec`
/// traits to handle encoding and decoding of messages frames. Note that
/// the incoming and outgoing frame types may be distinct.
///
/// This function returns a *single* object that is both `Stream` and
/// `Sink`; grouping this into a single object is often useful for layering
/// things like gzip or TLS, which require both read and write access to the
/// underlying object.
///
/// This objects takes a stream and a readbuffer and a writebuffer. These
/// field can be obtained from an existing `Framed` with the
/// `into_parts` method.
pub fn from_parts(parts: FramedParts<T, U>) -> Framed<T, U> {
Framed {
io: parts.io,
codec: parts.codec,
flags: parts.flags,
write_buf: parts.write_buf,
read_buf: parts.read_buf,
}
}

/// Returns a reference to the underlying codec.
pub fn get_codec(&self) -> &U {
pub fn codec_ref(&self) -> &U {
&self.codec
}

/// Returns a mutable reference to the underlying codec.
pub fn get_codec_mut(&mut self) -> &mut U {
pub fn codec_mut(&mut self) -> &mut U {
&mut self.codec
}

Expand All @@ -106,20 +76,29 @@ impl<T, U> Framed<T, U> {
/// Note that care should be taken to not tamper with the underlying stream
/// of data coming in as it may corrupt the stream of frames otherwise
/// being worked with.
pub fn get_ref(&self) -> &T {
pub fn io_ref(&self) -> &T {
&self.io
}

/// Returns a mutable reference to the underlying I/O stream wrapped by
/// `Frame`.
/// Returns a mutable reference to the underlying I/O stream.
///
/// Note that care should be taken to not tamper with the underlying stream
/// of data coming in as it may corrupt the stream of frames otherwise
/// being worked with.
pub fn get_mut(&mut self) -> &mut T {
pub fn io_mut(&mut self) -> &mut T {
&mut self.io
}

/// Returns a `Pin` of a mutable reference to the underlying I/O stream.
pub fn io_pin(self: Pin<&mut Self>) -> Pin<&mut T> {
self.project().io
}

/// Check if read buffer is empty.
pub fn is_read_buf_empty(&self) -> bool {
self.read_buf.is_empty()
}

/// Check if write buffer is empty.
pub fn is_write_buf_empty(&self) -> bool {
self.write_buf.is_empty()
Expand All @@ -130,8 +109,15 @@ impl<T, U> Framed<T, U> {
self.write_buf.len() >= HW
}

/// Check if framed is able to write more data.
///
/// `Framed` object considers ready if there is free space in write buffer.
pub fn is_write_ready(&self) -> bool {
self.write_buf.len() < HW
}

/// Consume the `Frame`, returning `Frame` with different codec.
pub fn into_framed<U2, I2>(self, codec: U2) -> Framed<T, U2> {
pub fn replace_codec<U2, I2>(self, codec: U2) -> Framed<T, U2> {
Framed {
codec,
io: self.io,
Expand All @@ -142,7 +128,7 @@ impl<T, U> Framed<T, U> {
}

/// Consume the `Frame`, returning `Frame` with different io.
pub fn map_io<F, T2, I2>(self, f: F) -> Framed<T2, U>
pub fn into_map_io<F, T2>(self, f: F) -> Framed<T2, U>
where
F: Fn(T) -> T2,
{
Expand All @@ -156,7 +142,7 @@ impl<T, U> Framed<T, U> {
}

/// Consume the `Frame`, returning `Frame` with different codec.
pub fn map_codec<F, U2, I2>(self, f: F) -> Framed<T, U2>
pub fn into_map_codec<F, U2>(self, f: F) -> Framed<T, U2>
where
F: Fn(U) -> U2,
{
Expand All @@ -168,22 +154,6 @@ impl<T, U> Framed<T, U> {
write_buf: self.write_buf,
}
}

/// Consumes the `Frame`, returning its underlying I/O stream, the buffer
/// with unprocessed data, and the codec.
///
/// Note that care should be taken to not tamper with the underlying stream
/// of data coming in as it may corrupt the stream of frames otherwise
/// being worked with.
pub fn into_parts(self) -> FramedParts<T, U> {
FramedParts {
io: self.io,
codec: self.codec,
flags: self.flags,
read_buf: self.read_buf,
write_buf: self.write_buf,
}
}
}

impl<T, U> Framed<T, U> {
Expand All @@ -203,13 +173,6 @@ impl<T, U> Framed<T, U> {
Ok(())
}

/// Check if framed is able to write more data.
///
/// `Framed` object considers ready if there is free space in write buffer.
pub fn is_write_ready(&self) -> bool {
self.write_buf.len() < HW
}

/// Try to read underlying I/O stream and decode item.
pub fn next_item(
mut self: Pin<&mut Self>,
Expand Down Expand Up @@ -376,6 +339,41 @@ where
}
}

impl<T, U> Framed<T, U> {
/// This function returns a *single* object that is both `Stream` and
/// `Sink`; grouping this into a single object is often useful for layering
/// things like gzip or TLS, which require both read and write access to the
/// underlying object.
///
/// These objects take a stream, a read buffer and a write buffer. These
/// fields can be obtained from an existing `Framed` with the `into_parts` method.
pub fn from_parts(parts: FramedParts<T, U>) -> Framed<T, U> {
Framed {
io: parts.io,
codec: parts.codec,
flags: parts.flags,
write_buf: parts.write_buf,
read_buf: parts.read_buf,
}
}

/// Consumes the `Frame`, returning its underlying I/O stream, the buffer
/// with unprocessed data, and the codec.
///
/// Note that care should be taken to not tamper with the underlying stream
/// of data coming in as it may corrupt the stream of frames otherwise
/// being worked with.
pub fn into_parts(self) -> FramedParts<T, U> {
FramedParts {
io: self.io,
codec: self.codec,
flags: self.flags,
read_buf: self.read_buf,
write_buf: self.write_buf,
}
}
}

/// `FramedParts` contains an export of the data of a Framed transport.
/// It can be used to construct a new `Framed` with a different codec.
/// It contains all current buffers and the inner transport.
Expand Down
4 changes: 3 additions & 1 deletion actix-codec/src/lib.rs
Expand Up @@ -8,13 +8,15 @@
//! [`AsyncWrite`]: AsyncWrite
//! [`Sink`]: futures_sink::Sink
//! [`Stream`]: futures_core::Stream

#![deny(rust_2018_idioms)]
#![warn(missing_docs)]

mod bcodec;
mod framed;

pub use self::bcodec::BytesCodec;
pub use self::framed::{Framed, FramedParts};
pub use self::framed::Framed;

pub use tokio::io::{AsyncRead, AsyncWrite};
pub use tokio_util::codec::{Decoder, Encoder};
2 changes: 1 addition & 1 deletion actix-utils/src/counter.rs
Expand Up @@ -7,7 +7,7 @@ use crate::task::LocalWaker;
#[derive(Clone)]
/// Simple counter with ability to notify task on reaching specific number
///
/// Counter could be cloned, total ncount is shared across all clones.
/// Counter could be cloned, total n-count is shared across all clones.
pub struct Counter(Rc<CounterInner>);

struct CounterInner {
Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion actix-utils/src/inflight.rs
Expand Up @@ -152,7 +152,7 @@ mod tests {
}

#[actix_rt::test]
async fn test_newtransform() {
async fn test_new_transform() {
let wait_time = Duration::from_millis(50);

let srv = apply(InFlight::new(1), fn_factory(|| ok(SleepService(wait_time))));
Expand Down
3 changes: 2 additions & 1 deletion actix-utils/src/lib.rs
@@ -1,11 +1,12 @@
//! Actix utils - various helper services

#![deny(rust_2018_idioms)]
#![allow(clippy::type_complexity)]

pub mod condition;
pub mod counter;
pub mod either;
pub mod framed;
pub mod dispatcher;
pub mod inflight;
pub mod keepalive;
pub mod mpsc;
Expand Down
2 changes: 1 addition & 1 deletion actix-utils/src/oneshot.rs
Expand Up @@ -170,7 +170,7 @@ pub struct PReceiver<T> {
inner: Rc<RefCell<Slab<PoolInner<T>>>>,
}

// The oneshots do not ever project Pin to the inner T
// The one-shots do not ever project Pin to the inner T
impl<T> Unpin for PReceiver<T> {}
impl<T> Unpin for PSender<T> {}

Expand Down
2 changes: 1 addition & 1 deletion actix-utils/src/order.rs
Expand Up @@ -231,7 +231,7 @@ mod tests {
}

#[actix_rt::test]
async fn test_inorder() {
async fn test_in_order() {
let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
let (tx3, rx3) = oneshot::channel();
Expand Down
2 changes: 1 addition & 1 deletion actix-utils/src/task.rs
Expand Up @@ -36,7 +36,7 @@ impl LocalWaker {

#[inline]
/// Check if waker has been registered.
pub fn is_registed(&self) -> bool {
pub fn is_registered(&self) -> bool {
unsafe { (*self.waker.get()).is_some() }
}

Expand Down
4 changes: 2 additions & 2 deletions actix-utils/src/time.rs
Expand Up @@ -173,7 +173,7 @@ mod tests {
///
/// Expected Behavior: Two back-to-back calls of `LowResTimeService::now()` return the same value.
#[actix_rt::test]
async fn lowres_time_service_time_does_not_immediately_change() {
async fn low_res_time_service_time_does_not_immediately_change() {
let resolution = Duration::from_millis(50);
let time_service = LowResTimeService::with(resolution);
assert_eq!(time_service.now(), time_service.now());
Expand Down Expand Up @@ -210,7 +210,7 @@ mod tests {
/// Expected Behavior: Two calls of `LowResTimeService::now()` made in subsequent resolution interval return different values
/// and second value is greater than the first one at least by a resolution interval.
#[actix_rt::test]
async fn lowres_time_service_time_updates_after_resolution_interval() {
async fn low_res_time_service_time_updates_after_resolution_interval() {
let resolution = Duration::from_millis(100);
let wait_time = Duration::from_millis(300);
let time_service = LowResTimeService::with(resolution);
Expand Down
2 changes: 1 addition & 1 deletion actix-utils/src/timeout.rs
Expand Up @@ -223,7 +223,7 @@ mod tests {
}

#[actix_rt::test]
async fn test_timeout_newservice() {
async fn test_timeout_new_service() {
let resolution = Duration::from_millis(100);
let wait_time = Duration::from_millis(500);

Expand Down

0 comments on commit b07ee29

Please sign in to comment.