Skip to content

Commit

Permalink
add lines codec (#338)
Browse files Browse the repository at this point in the history
  • Loading branch information
robjtede committed Nov 5, 2021
1 parent 15279ea commit b2cef8f
Show file tree
Hide file tree
Showing 7 changed files with 240 additions and 5 deletions.
3 changes: 3 additions & 0 deletions actix-codec/CHANGES.md
@@ -1,6 +1,9 @@
# Changes

## Unreleased - 2021-xx-xx
* Added `LinesCodec.` [#338]

[#338]: https://github.com/actix/actix-net/pull/338


## 0.4.0 - 2021-04-20
Expand Down
13 changes: 12 additions & 1 deletion actix-codec/Cargo.toml
@@ -1,7 +1,10 @@
[package]
name = "actix-codec"
version = "0.4.0"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
authors = [
"Nikolay Kim <fafhrd91@gmail.com>",
"Rob Ede <robjtede@icloud.com>",
]
description = "Codec utilities for working with framed protocols"
keywords = ["network", "framework", "async", "futures"]
repository = "https://github.com/actix/actix-net"
Expand All @@ -19,6 +22,14 @@ bytes = "1"
futures-core = { version = "0.3.7", default-features = false }
futures-sink = { version = "0.3.7", default-features = false }
log = "0.4"
memchr = "2.3"
pin-project-lite = "0.2"
tokio = "1.5.1"
tokio-util = { version = "0.6", features = ["codec", "io"] }

[dev-dependencies]
criterion = { version = "0.3", features = ["html_reports"] }

[[bench]]
name = "lines"
harness = false
57 changes: 57 additions & 0 deletions actix-codec/benches/lines.rs
@@ -0,0 +1,57 @@
use bytes::BytesMut;
use criterion::{criterion_group, criterion_main, Criterion};

const INPUT: &[u8] = include_bytes!("./lorem.txt");

fn bench_lines_codec(c: &mut Criterion) {
let mut decode_group = c.benchmark_group("lines decode");

decode_group.bench_function("actix", |b| {
b.iter(|| {
use actix_codec::Decoder as _;

let mut codec = actix_codec::LinesCodec::default();
let mut buf = BytesMut::from(INPUT);
while let Ok(Some(_bytes)) = codec.decode_eof(&mut buf) {}
});
});

decode_group.bench_function("tokio", |b| {
b.iter(|| {
use tokio_util::codec::Decoder as _;

let mut codec = tokio_util::codec::LinesCodec::new();
let mut buf = BytesMut::from(INPUT);
while let Ok(Some(_bytes)) = codec.decode_eof(&mut buf) {}
});
});

decode_group.finish();

let mut encode_group = c.benchmark_group("lines encode");

encode_group.bench_function("actix", |b| {
b.iter(|| {
use actix_codec::Encoder as _;

let mut codec = actix_codec::LinesCodec::default();
let mut buf = BytesMut::new();
codec.encode("123", &mut buf).unwrap();
});
});

encode_group.bench_function("tokio", |b| {
b.iter(|| {
use tokio_util::codec::Encoder as _;

let mut codec = tokio_util::codec::LinesCodec::new();
let mut buf = BytesMut::new();
codec.encode("123", &mut buf).unwrap();
});
});

encode_group.finish();
}

criterion_group!(benches, bench_lines_codec);
criterion_main!(benches);
5 changes: 5 additions & 0 deletions actix-codec/benches/lorem.txt
@@ -0,0 +1,5 @@
Lorem ipsum dolor sit amet, consectetur adipiscing elit. In tortor quam, pulvinar sit amet vestibulum eget, tincidunt non urna. Sed eu sem in felis malesuada venenatis. Suspendisse volutpat aliquet nisi, in condimentum nibh convallis id. Quisque gravida felis scelerisque ipsum aliquam consequat. Praesent libero odio, malesuada vitae odio quis, aliquam aliquet enim. In fringilla ut turpis nec pharetra. Duis eu posuere metus. Sed a aliquet massa. Mauris non tempus mi, quis mattis libero. Vivamus ornare ex at semper cursus. Vestibulum sed facilisis erat, aliquet mollis est. In interdum, magna iaculis ultricies elementum, mi ante vestibulum mauris, nec viverra turpis lorem quis ante. Proin in auctor erat. Vivamus dictum congue massa, fermentum bibendum leo pretium quis. Integer dapibus sodales ligula, sit amet imperdiet felis suscipit eu. Phasellus non ornare enim.
Nam feugiat neque sit amet hendrerit rhoncus. Nunc suscipit molestie vehicula. Aenean vulputate porttitor augue, sit amet molestie dolor volutpat vitae. Nulla vitae condimentum eros. Aliquam tristique purus at metus lacinia egestas. Cras euismod lorem eu orci lobortis, sed tincidunt nisl laoreet. Ut suscipit fermentum mi, et euismod tortor. Pellentesque vitae tempor quam, sed dignissim mi. Suspendisse luctus lacus vitae ligula blandit vehicula. Quisque interdum iaculis tincidunt. Nunc elementum mi vitae tempor placerat. Suspendisse potenti. Donec blandit laoreet ipsum, quis rhoncus velit vulputate sed.
Aliquam suscipit lectus eros, at maximus dolor efficitur quis. Integer blandit tortor orci, nec mattis nunc eleifend ac. Mauris pharetra vel quam quis lacinia. Duis lobortis condimentum nunc ut facilisis. Praesent arcu nisi, porta sit amet viverra sit amet, pellentesque ut nisi. Nunc gravida tortor eu ligula tempus, in interdum magna pretium. Fusce eu ornare sapien. Nullam pellentesque cursus eros. Nam orci massa, faucibus eget leo eget, elementum vulputate erat. Fusce vehicula augue et dui hendrerit vulputate. Mauris neque lacus, porttitor ut condimentum id, efficitur ac neque. Vestibulum ante ipsum primis in faucibus orci luctus et ultrices posuere cubilia curae; Donec accumsan, lectus fermentum elementum tristique, ipsum tortor mollis ante, non lacinia nibh ex quis sapien.
Donec pharetra, elit eget rutrum luctus, urna ligula facilisis lorem, sit amet rhoncus ante est eu mi. Vestibulum vestibulum ultricies interdum. Nulla tincidunt ante non hendrerit venenatis. Curabitur vestibulum turpis erat, id efficitur quam venenatis eu. Fusce nulla sem, dapibus vel quam feugiat, ornare fermentum ligula. Praesent tempus tincidunt mauris, non pellentesque felis varius in. Aenean eu arcu ligula. Morbi dapibus maximus nulla a pharetra. Fusce leo metus, luctus ut cursus non, sollicitudin non lectus. Integer pellentesque eleifend erat, vel gravida purus tempus a. Mauris id vestibulum quam. Nunc vitae ullamcorper metus, pharetra placerat enim. Fusce in ultrices nisl. Curabitur justo mauris, dignissim in aliquam sit amet, sollicitudin ut risus. Cras tempor rutrum justo, non tincidunt est maximus at.
Aliquam ac velit tincidunt, ullamcorper velit sit amet, pulvinar nisi. Nullam rhoncus rhoncus egestas. Cras ac luctus nisi. Mauris sit amet risus at magna volutpat ultrices quis ac dui. Aliquam condimentum tellus purus, vel sagittis odio vulputate at. Sed ut finibus tellus. Aliquam tincidunt vehicula diam.
7 changes: 3 additions & 4 deletions actix-codec/src/bcodec.rs
@@ -1,11 +1,10 @@
use bytes::{Buf, Bytes, BytesMut};
use std::io;

use bytes::{Buf, Bytes, BytesMut};

use super::{Decoder, Encoder};

/// Bytes codec.
///
/// Reads/Writes chunks of bytes from a stream.
/// Bytes codec. Reads/writes chunks of bytes from a stream.
#[derive(Debug, Copy, Clone)]
pub struct BytesCodec;

Expand Down
2 changes: 2 additions & 0 deletions actix-codec/src/lib.rs
Expand Up @@ -14,9 +14,11 @@

mod bcodec;
mod framed;
mod lines;

pub use self::bcodec::BytesCodec;
pub use self::framed::{Framed, FramedParts};
pub use self::lines::LinesCodec;

pub use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
pub use tokio_util::codec::{Decoder, Encoder};
Expand Down
158 changes: 158 additions & 0 deletions actix-codec/src/lines.rs
@@ -0,0 +1,158 @@
use std::io;

use bytes::{Buf, BufMut, Bytes, BytesMut};
use memchr::memchr;

use super::{Decoder, Encoder};

/// Lines codec. Reads/writes line delimited strings.
///
/// Will split input up by LF or CRLF delimiters. I.e. carriage return characters at the end of
/// lines are not preserved.
#[derive(Debug, Copy, Clone, Default)]
#[non_exhaustive]
pub struct LinesCodec;

impl<T: AsRef<str>> Encoder<T> for LinesCodec {
type Error = io::Error;

#[inline]
fn encode(&mut self, item: T, dst: &mut BytesMut) -> Result<(), Self::Error> {
let item = item.as_ref();
dst.reserve(item.len() + 1);
dst.put_slice(item.as_bytes());
dst.put_u8(b'\n');
Ok(())
}
}

impl Decoder for LinesCodec {
type Item = String;
type Error = io::Error;

fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
if src.is_empty() {
return Ok(None);
}

let len = match memchr(b'\n', src) {
Some(n) => n,
None => {
return Ok(None);
}
};

// split up to new line char
let mut buf = src.split_to(len);
debug_assert_eq!(len, buf.len());

// remove new line char from source
src.advance(1);

match buf.last() {
// remove carriage returns at the end of buf
Some(b'\r') => buf.truncate(len - 1),

// line is empty
None => return Ok(Some(String::new())),

_ => {}
}

try_into_utf8(buf.freeze())
}

fn decode_eof(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
match self.decode(src)? {
Some(frame) => Ok(Some(frame)),
None if src.is_empty() => Ok(None),
None => {
let buf = match src.last() {
// if last line ends in a CR then take everything up to it
Some(b'\r') => src.split_to(src.len() - 1),

// take all bytes from source
_ => src.split(),
};

if buf.is_empty() {
return Ok(None);
}

try_into_utf8(buf.freeze())
}
}
}
}

// Attempts to convert bytes into a `String`.
fn try_into_utf8(buf: Bytes) -> io::Result<Option<String>> {
String::from_utf8(buf.to_vec())
.map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err))
.map(Some)
}

#[cfg(test)]
mod tests {
use bytes::BufMut as _;

use super::*;

#[test]
fn lines_decoder() {
let mut codec = LinesCodec::default();
let mut buf = BytesMut::from("\nline 1\nline 2\r\nline 3\n\r\n\r");

assert_eq!("", codec.decode(&mut buf).unwrap().unwrap());
assert_eq!("line 1", codec.decode(&mut buf).unwrap().unwrap());
assert_eq!("line 2", codec.decode(&mut buf).unwrap().unwrap());
assert_eq!("line 3", codec.decode(&mut buf).unwrap().unwrap());
assert_eq!("", codec.decode(&mut buf).unwrap().unwrap());
assert!(codec.decode(&mut buf).unwrap().is_none());
assert!(codec.decode_eof(&mut buf).unwrap().is_none());

buf.put_slice(b"k");
assert!(codec.decode(&mut buf).unwrap().is_none());
assert_eq!("\rk", codec.decode_eof(&mut buf).unwrap().unwrap());

assert!(codec.decode(&mut buf).unwrap().is_none());
assert!(codec.decode_eof(&mut buf).unwrap().is_none());
}

#[test]
fn lines_encoder() {
let mut codec = LinesCodec::default();

let mut buf = BytesMut::new();

codec.encode("", &mut buf).unwrap();
assert_eq!(&buf[..], b"\n");

codec.encode("test", &mut buf).unwrap();
assert_eq!(&buf[..], b"\ntest\n");

codec.encode("a\nb", &mut buf).unwrap();
assert_eq!(&buf[..], b"\ntest\na\nb\n");
}

#[test]
fn lines_encoder_no_overflow() {
let mut codec = LinesCodec::default();

let mut buf = BytesMut::new();
codec.encode("1234567", &mut buf).unwrap();
assert_eq!(&buf[..], b"1234567\n");

let mut buf = BytesMut::new();
codec.encode("12345678", &mut buf).unwrap();
assert_eq!(&buf[..], b"12345678\n");

let mut buf = BytesMut::new();
codec.encode("123456789111213", &mut buf).unwrap();
assert_eq!(&buf[..], b"123456789111213\n");

let mut buf = BytesMut::new();
codec.encode("1234567891112131", &mut buf).unwrap();
assert_eq!(&buf[..], b"1234567891112131\n");
}
}

0 comments on commit b2cef8f

Please sign in to comment.