From f26730a584f191a05829d824bd9c0edc3f6c1d64 Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Fri, 16 Apr 2021 05:51:30 +0100 Subject: [PATCH] add lines codec --- actix-codec/Cargo.toml | 10 +++- actix-codec/benches/lines.rs | 33 +++++++++++ actix-codec/src/lib.rs | 2 + actix-codec/src/lines.rs | 112 +++++++++++++++++++++++++++++++++++ 4 files changed, 156 insertions(+), 1 deletion(-) create mode 100644 actix-codec/benches/lines.rs create mode 100644 actix-codec/src/lines.rs diff --git a/actix-codec/Cargo.toml b/actix-codec/Cargo.toml index 95a2476409..bf5c69cdb3 100644 --- a/actix-codec/Cargo.toml +++ b/actix-codec/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-codec" -version = "0.4.0-beta.1" +version = "0.4.0" authors = ["Nikolay Kim "] description = "Codec utilities for working with framed protocols" keywords = ["network", "framework", "async", "futures"] @@ -21,6 +21,14 @@ bytes = "1" futures-core = { version = "0.3.7", default-features = false } futures-sink = { version = "0.3.7", default-features = false } log = "0.4" +memchr = "2.3" pin-project-lite = "0.2" tokio = "1" tokio-util = { version = "0.6", features = ["codec", "io"] } + +[dev-dependencies] +criterion = { version = "0.3", features = ["html_reports"] } + +[[bench]] +name = "lines" +harness = false diff --git a/actix-codec/benches/lines.rs b/actix-codec/benches/lines.rs new file mode 100644 index 0000000000..15c29b8f9c --- /dev/null +++ b/actix-codec/benches/lines.rs @@ -0,0 +1,33 @@ +use bytes::BytesMut; +use criterion::{criterion_group, criterion_main, Criterion}; + +const INPUT: &str = "line 1\nline 2\r\nline 3\n\r\n\r"; + +fn bench_lines_codec(c: &mut Criterion) { + let mut group = c.benchmark_group("lines codec"); + + group.bench_function("actix", |b| { + b.iter(|| { + use actix_codec::Decoder as _; + + let mut codec = actix_codec::LinesCodec; + let mut buf = BytesMut::from(INPUT); + while let Ok(Some(_bytes)) = codec.decode_eof(&mut buf) {} + }); + }); + + group.bench_function("tokio", |b| { + b.iter(|| { + use tokio_util::codec::Decoder as _; + + let mut codec = tokio_util::codec::LinesCodec::new(); + let mut buf = BytesMut::from(INPUT); + while let Ok(Some(_bytes)) = codec.decode_eof(&mut buf) {} + }); + }); + + group.finish(); +} + +criterion_group!(benches, bench_lines_codec); +criterion_main!(benches); diff --git a/actix-codec/src/lib.rs b/actix-codec/src/lib.rs index c7713bfe4d..370c9bb73f 100644 --- a/actix-codec/src/lib.rs +++ b/actix-codec/src/lib.rs @@ -14,8 +14,10 @@ mod bcodec; mod framed; +mod lines; pub use self::bcodec::BytesCodec; +pub use self::lines::LinesCodec; pub use self::framed::{Framed, FramedParts}; pub use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; diff --git a/actix-codec/src/lines.rs b/actix-codec/src/lines.rs new file mode 100644 index 0000000000..aa9734ff46 --- /dev/null +++ b/actix-codec/src/lines.rs @@ -0,0 +1,112 @@ +use std::io; + +use bytes::{Buf, BytesMut}; +use memchr::memchr; + +use super::{Decoder, Encoder}; + +/// Bytes codec. +/// +/// Reads/Writes chunks of bytes from a stream. +#[derive(Debug, Copy, Clone)] +pub struct LinesCodec; + +impl Encoder for LinesCodec { + type Error = io::Error; + + #[inline] + fn encode(&mut self, item: String, dst: &mut BytesMut) -> Result<(), Self::Error> { + dst.extend_from_slice(item.as_ref()); + Ok(()) + } +} + +impl Decoder for LinesCodec { + type Item = String; + type Error = io::Error; + + fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { + if src.is_empty() { + Ok(None) + } else { + let n = match memchr(b'\n', src.chunk()) { + Some(n) => n, + None => { + return Ok(None); + } + }; + + // split up to new line char + let mut buf = src.split_to(n); + let len = buf.len(); + + // remove new line char from source + src.advance(1); + + match buf.last() { + Some(char) if *char == b'\r' => { + // remove one carriage returns at the end of buf + buf.truncate(len - 1); + } + None => return Ok(Some(String::new())), + _ => {} + } + + to_utf8(buf) + } + } + + fn decode_eof(&mut self, src: &mut BytesMut) -> Result, Self::Error> { + match self.decode(src)? { + Some(frame) => Ok(Some(frame)), + None if src.is_empty() => Ok(None), + None => { + let len = src.len(); + let buf = if src[len - 1] == b'\r' { + src.split_to(len - 1) + } else { + src.split() + }; + + if buf.is_empty() { + return Ok(None); + } + + return to_utf8(buf); + } + } + } +} + +fn to_utf8(buf: BytesMut) -> io::Result> { + String::from_utf8(buf.to_vec()) + .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err)) + .map(Some) +} + +#[cfg(test)] +mod tests { + use bytes::BufMut as _; + + use super::*; + + #[test] + fn lines_decoder() { + let mut codec = LinesCodec; + let buf = &mut BytesMut::new(); + buf.reserve(200); + buf.put_slice(b"\nline 1\nline 2\r\nline 3\n\r\n\r"); + assert_eq!("", codec.decode(buf).unwrap().unwrap()); + assert_eq!("line 1", codec.decode(buf).unwrap().unwrap()); + assert_eq!("line 2", codec.decode(buf).unwrap().unwrap()); + assert_eq!("line 3", codec.decode(buf).unwrap().unwrap()); + assert_eq!("", codec.decode(buf).unwrap().unwrap()); + assert_eq!(None, codec.decode(buf).unwrap()); + assert_eq!(None, codec.decode_eof(buf).unwrap()); + buf.put_slice(b"k"); + assert_eq!(None, codec.decode(buf).unwrap()); + assert_eq!("\rk", codec.decode_eof(buf).unwrap().unwrap()); + assert_eq!(None, codec.decode(buf).unwrap()); + assert_eq!(None, codec.decode_eof(buf).unwrap()); + } +}