Skip to content

Commit

Permalink
add lines codec
Browse files Browse the repository at this point in the history
  • Loading branch information
robjtede committed Apr 16, 2021
1 parent fdafc1d commit f26730a
Show file tree
Hide file tree
Showing 4 changed files with 156 additions and 1 deletion.
10 changes: 9 additions & 1 deletion actix-codec/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "actix-codec"
version = "0.4.0-beta.1"
version = "0.4.0"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Codec utilities for working with framed protocols"
keywords = ["network", "framework", "async", "futures"]
Expand All @@ -21,6 +21,14 @@ bytes = "1"
futures-core = { version = "0.3.7", default-features = false }
futures-sink = { version = "0.3.7", default-features = false }
log = "0.4"
memchr = "2.3"
pin-project-lite = "0.2"
tokio = "1"
tokio-util = { version = "0.6", features = ["codec", "io"] }

[dev-dependencies]
criterion = { version = "0.3", features = ["html_reports"] }

[[bench]]
name = "lines"
harness = false
33 changes: 33 additions & 0 deletions actix-codec/benches/lines.rs
@@ -0,0 +1,33 @@
use bytes::BytesMut;
use criterion::{criterion_group, criterion_main, Criterion};

const INPUT: &str = "line 1\nline 2\r\nline 3\n\r\n\r";

fn bench_lines_codec(c: &mut Criterion) {
let mut group = c.benchmark_group("lines codec");

group.bench_function("actix", |b| {
b.iter(|| {
use actix_codec::Decoder as _;

let mut codec = actix_codec::LinesCodec;
let mut buf = BytesMut::from(INPUT);
while let Ok(Some(_bytes)) = codec.decode_eof(&mut buf) {}
});
});

group.bench_function("tokio", |b| {
b.iter(|| {
use tokio_util::codec::Decoder as _;

let mut codec = tokio_util::codec::LinesCodec::new();
let mut buf = BytesMut::from(INPUT);
while let Ok(Some(_bytes)) = codec.decode_eof(&mut buf) {}
});
});

group.finish();
}

criterion_group!(benches, bench_lines_codec);
criterion_main!(benches);
2 changes: 2 additions & 0 deletions actix-codec/src/lib.rs
Expand Up @@ -14,8 +14,10 @@

mod bcodec;
mod framed;
mod lines;

pub use self::bcodec::BytesCodec;
pub use self::lines::LinesCodec;
pub use self::framed::{Framed, FramedParts};

pub use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
Expand Down
112 changes: 112 additions & 0 deletions actix-codec/src/lines.rs
@@ -0,0 +1,112 @@
use std::io;

use bytes::{Buf, BytesMut};
use memchr::memchr;

use super::{Decoder, Encoder};

/// Bytes codec.
///
/// Reads/Writes chunks of bytes from a stream.
#[derive(Debug, Copy, Clone)]
pub struct LinesCodec;

impl Encoder<String> for LinesCodec {
type Error = io::Error;

#[inline]
fn encode(&mut self, item: String, dst: &mut BytesMut) -> Result<(), Self::Error> {
dst.extend_from_slice(item.as_ref());
Ok(())
}
}

impl Decoder for LinesCodec {
type Item = String;
type Error = io::Error;

fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
if src.is_empty() {
Ok(None)
} else {
let n = match memchr(b'\n', src.chunk()) {
Some(n) => n,
None => {
return Ok(None);
}
};

// split up to new line char
let mut buf = src.split_to(n);
let len = buf.len();

// remove new line char from source
src.advance(1);

match buf.last() {
Some(char) if *char == b'\r' => {
// remove one carriage returns at the end of buf
buf.truncate(len - 1);
}
None => return Ok(Some(String::new())),
_ => {}
}

to_utf8(buf)
}
}

fn decode_eof(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
match self.decode(src)? {
Some(frame) => Ok(Some(frame)),
None if src.is_empty() => Ok(None),
None => {
let len = src.len();
let buf = if src[len - 1] == b'\r' {
src.split_to(len - 1)
} else {
src.split()
};

if buf.is_empty() {
return Ok(None);
}

return to_utf8(buf);
}
}
}
}

fn to_utf8(buf: BytesMut) -> io::Result<Option<String>> {
String::from_utf8(buf.to_vec())
.map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err))
.map(Some)
}

#[cfg(test)]
mod tests {
use bytes::BufMut as _;

use super::*;

#[test]
fn lines_decoder() {
let mut codec = LinesCodec;
let buf = &mut BytesMut::new();
buf.reserve(200);
buf.put_slice(b"\nline 1\nline 2\r\nline 3\n\r\n\r");
assert_eq!("", codec.decode(buf).unwrap().unwrap());
assert_eq!("line 1", codec.decode(buf).unwrap().unwrap());
assert_eq!("line 2", codec.decode(buf).unwrap().unwrap());
assert_eq!("line 3", codec.decode(buf).unwrap().unwrap());
assert_eq!("", codec.decode(buf).unwrap().unwrap());
assert_eq!(None, codec.decode(buf).unwrap());
assert_eq!(None, codec.decode_eof(buf).unwrap());
buf.put_slice(b"k");
assert_eq!(None, codec.decode(buf).unwrap());
assert_eq!("\rk", codec.decode_eof(buf).unwrap().unwrap());
assert_eq!(None, codec.decode(buf).unwrap());
assert_eq!(None, codec.decode_eof(buf).unwrap());
}
}

0 comments on commit f26730a

Please sign in to comment.