Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

nwalfield's streaming reader #122

Merged
merged 10 commits into from Mar 8, 2020
2 changes: 1 addition & 1 deletion README.md
Expand Up @@ -9,7 +9,7 @@ Made with CLion. Thanks to JetBrains for supporting open source!

It's base64. What more could anyone want?

This library's goals are to be *correct* and *fast*. It's thoroughly tested and widely used. It exposes functionality at multiple levels of abstraction so you can choose the level of convenience vs performance that you want, e.g. `decode_config_slice` decodes into an existing `&mut [u8]` and is pretty fast (2.6GiB/s for a 3 KiB input), whereas `decode_config` allocates a new `Vec<u8>` and returns it, which might be more convenient in some cases, but is slower (although still fast enough for most purposes) at 2.1 GiB/s.
This library's goals are to be *correct* and *fast*. It's thoroughly tested and widely used. It exposes functionality at multiple levels of abstraction so you can choose the level of convenience vs performance that you want, e.g. `decode_config_slice` decodes into an existing `&mut [u8]` and is pretty fast (2.6GiB/s for a 3 KiB input), whereas `decode_config` allocates a new `Vec<u8>` and returns it, which might be more convenient in some cases, but is slower (although still fast enough for almost any purpose) at 2.1 GiB/s.

Example
---
Expand Down
4 changes: 3 additions & 1 deletion RELEASE-NOTES.md
@@ -1,6 +1,8 @@
# Next

- 0.11.0
- A `Read` implementation (`DecoderReader`) to let users transparently decoded data from a b64 input source

# 0.11.0
- Minimum rust version 1.34.0
- `no_std` is now supported via the two new features `alloc` and `std`.

Expand Down
21 changes: 20 additions & 1 deletion benches/benchmarks.rs
Expand Up @@ -11,7 +11,7 @@ use base64::{

use criterion::{black_box, Bencher, Criterion, ParameterizedBenchmark, Throughput};
use rand::{FromEntropy, Rng};
use std::io::Write;
use std::io::{self, Read, Write};

const TEST_CONFIG: Config = base64::STANDARD;

Expand Down Expand Up @@ -52,6 +52,24 @@ fn do_decode_bench_slice(b: &mut Bencher, &size: &usize) {
});
}

fn do_decode_bench_stream(b: &mut Bencher, &size: &usize) {
let mut v: Vec<u8> = Vec::with_capacity(size * 3 / 4);
fill(&mut v);
let encoded = encode(&v);

let mut buf = Vec::new();
buf.resize(size, 0);
buf.truncate(0);

b.iter(|| {
let mut cursor = io::Cursor::new(&encoded[..]);
let mut decoder = base64::read::DecoderReader::new(&mut cursor, TEST_CONFIG);
decoder.read_to_end(&mut buf).unwrap();
buf.clear();
black_box(&buf);
});
}

fn do_encode_bench(b: &mut Bencher, &size: &usize) {
let mut v: Vec<u8> = Vec::with_capacity(size);
fill(&mut v);
Expand Down Expand Up @@ -138,6 +156,7 @@ fn decode_benchmarks(byte_sizes: &[usize]) -> ParameterizedBenchmark<usize> {
.throughput(|s| Throughput::Bytes(*s as u64))
.with_function("decode_reuse_buf", do_decode_bench_reuse_buf)
.with_function("decode_slice", do_decode_bench_slice)
.with_function("decode_stream", do_decode_bench_stream)
}

fn bench(c: &mut Criterion) {
Expand Down
10 changes: 10 additions & 0 deletions src/lib.rs
Expand Up @@ -43,6 +43,14 @@
//! Input can be invalid because it has invalid characters or invalid padding. (No padding at all is
//! valid, but excess padding is not.) Whitespace in the input is invalid.
//!
//! # `Read` and `Write`
//!
//! To map a `Read` of b64 bytes to the decoded bytes, wrap a reader (file, network socket, etc)
//! with `base64::read::DecoderReader`. To write raw bytes and have them b64 encoded on the fly,
//! wrap a writer with `base64::write::EncoderWriter`. There is some performance overhead (15% or
//! so) because of the necessary buffer shuffling -- still fast enough that almost nobody cares.
//! Also, these implementations do not heap allocate.
//!
//! # Panics
//!
//! If length calculations result in overflowing `usize`, a panic will result.
Expand Down Expand Up @@ -77,6 +85,8 @@ doctest!("../README.md");

mod chunked_encoder;
pub mod display;
#[cfg(any(feature = "std", test))]
pub mod read;
mod tables;
#[cfg(any(feature = "std", test))]
pub mod write;
Expand Down
282 changes: 282 additions & 0 deletions src/read/decoder.rs
@@ -0,0 +1,282 @@
use crate::{decode_config_slice, Config, DecodeError};
use std::io::Read;
use std::{cmp, fmt, io};

// This should be large, but it has to fit on the stack.
pub(crate) const BUF_SIZE: usize = 1024;

// 4 bytes of base64 data encode 3 bytes of raw data (modulo padding).
const BASE64_CHUNK_SIZE: usize = 4;
const DECODED_CHUNK_SIZE: usize = 3;

/// A `Read` implementation that decodes base64 data read from an underlying reader.
///
/// # Examples
///
/// ```
/// use std::io::Read;
/// use std::io::Cursor;
///
/// // use a cursor as the simplest possible `Read` -- in real code this is probably a file, etc.
/// let mut wrapped_reader = Cursor::new(b"YXNkZg==");
/// let mut decoder = base64::read::DecoderReader::new(
/// &mut wrapped_reader, base64::STANDARD);
///
/// // handle errors as you normally would
/// let mut result = Vec::new();
/// decoder.read_to_end(&mut result).unwrap();
///
/// assert_eq!(b"asdf", &result[..]);
///
/// ```
pub struct DecoderReader<'a, R: 'a + io::Read> {
config: Config,
/// Where b64 data is read from
r: &'a mut R,

// Holds b64 data read from the delegate reader.
b64_buffer: [u8; BUF_SIZE],
// The start of the pending buffered data in b64_buffer.
b64_offset: usize,
// The amount of buffered b64 data.
b64_len: usize,
// Since the caller may provide us with a buffer of size 1 or 2 that's too small to copy a
// decoded chunk in to, we have to be able to hang on to a few decoded bytes.
// Technically we only need to hold 2 bytes but then we'd need a separate temporary buffer to
// decode 3 bytes into and then juggle copying one byte into the provided read buf and the rest
// into here, which seems like a lot of complexity for 1 extra byte of storage.
decoded_buffer: [u8; 3],
// index of start of decoded data
decoded_offset: usize,
// length of decoded data
decoded_len: usize,
// used to provide accurate offsets in errors
total_b64_decoded: usize,
}

impl<'a, R: io::Read> fmt::Debug for DecoderReader<'a, R> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("DecoderReader")
.field("config", &self.config)
.field("b64_offset", &self.b64_offset)
.field("b64_len", &self.b64_len)
.field("decoded_buffer", &self.decoded_buffer)
.field("decoded_offset", &self.decoded_offset)
.field("decoded_len", &self.decoded_len)
.field("total_b64_decoded", &self.total_b64_decoded)
.finish()
}
}

impl<'a, R: io::Read> DecoderReader<'a, R> {
/// Create a new decoder that will read from the provided reader `r`.
pub fn new(r: &'a mut R, config: Config) -> Self {
DecoderReader {
config,
r,
b64_buffer: [0; BUF_SIZE],
b64_offset: 0,
b64_len: 0,
decoded_buffer: [0; DECODED_CHUNK_SIZE],
decoded_offset: 0,
decoded_len: 0,
total_b64_decoded: 0,
}
}

/// Write as much as possible of the decoded buffer into the target buffer.
/// Must only be called when there is something to write and space to write into.
/// Returns a Result with the number of (decoded) bytes copied.
fn flush_decoded_buf(&mut self, buf: &mut [u8]) -> io::Result<usize> {
debug_assert!(self.decoded_len > 0);
debug_assert!(buf.len() > 0);

let copy_len = cmp::min(self.decoded_len, buf.len());
debug_assert!(copy_len > 0);
debug_assert!(copy_len <= self.decoded_len);

buf[..copy_len].copy_from_slice(
&self.decoded_buffer[self.decoded_offset..self.decoded_offset + copy_len],
);

self.decoded_offset += copy_len;
self.decoded_len -= copy_len;

debug_assert!(self.decoded_len < DECODED_CHUNK_SIZE);

Ok(copy_len)
}

/// Read into the remaining space in the buffer after the current contents.
/// Must only be called when there is space to read into in the buffer.
/// Returns the number of bytes read.
fn read_from_delegate(&mut self) -> io::Result<usize> {
debug_assert!(self.b64_offset + self.b64_len < BUF_SIZE);

let read = self
.r
.read(&mut self.b64_buffer[self.b64_offset + self.b64_len..])?;
self.b64_len += read;

debug_assert!(self.b64_offset + self.b64_len <= BUF_SIZE);

return Ok(read);
}

/// Decode the requested number of bytes from the b64 buffer into the provided buffer. It's the
/// caller's responsibility to choose the number of b64 bytes to decode correctly.
///
/// Returns a Result with the number of decoded bytes written to `buf`.
fn decode_to_buf(&mut self, num_bytes: usize, buf: &mut [u8]) -> io::Result<usize> {
debug_assert!(self.b64_len >= num_bytes);
debug_assert!(self.b64_offset + self.b64_len <= BUF_SIZE);
debug_assert!(buf.len() > 0);

let decoded = decode_config_slice(
&self.b64_buffer[self.b64_offset..self.b64_offset + num_bytes],
self.config,
&mut buf[..],
)
.map_err(|e| match e {
DecodeError::InvalidByte(offset, byte) => {
DecodeError::InvalidByte(self.total_b64_decoded + offset, byte)
}
DecodeError::InvalidLength => DecodeError::InvalidLength,
DecodeError::InvalidLastSymbol(offset, byte) => {
DecodeError::InvalidLastSymbol(self.total_b64_decoded + offset, byte)
}
})
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;

self.total_b64_decoded += num_bytes;
self.b64_offset += num_bytes;
self.b64_len -= num_bytes;

debug_assert!(self.b64_offset + self.b64_len <= BUF_SIZE);

Ok(decoded)
}
}

impl<'a, R: Read> Read for DecoderReader<'a, R> {
/// Decode input from the wrapped reader.
///
/// Under non-error circumstances, this returns `Ok` with the value being the number of bytes
/// written in `buf`.
///
/// Where possible, this function buffers base64 to minimize the number of read() calls to the
/// delegate reader.
///
/// # Errors
///
/// Any errors emitted by the delegate reader are returned. Decoding errors due to invalid
/// base64 are also possible, and will have `io::ErrorKind::InvalidData`.
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
if buf.len() == 0 {
return Ok(0);
}

// offset == BUF_SIZE when we copied it all last time
debug_assert!(self.b64_offset <= BUF_SIZE);
debug_assert!(self.b64_offset + self.b64_len <= BUF_SIZE);
debug_assert!(if self.b64_offset == BUF_SIZE {
self.b64_len == 0
} else {
self.b64_len <= BUF_SIZE
});

debug_assert!(if self.decoded_len == 0 {
// can be = when we were able to copy the complete chunk
self.decoded_offset <= DECODED_CHUNK_SIZE
} else {
self.decoded_offset < DECODED_CHUNK_SIZE
});

// We shouldn't ever decode into here when we can't immediately write at least one byte into
// the provided buf, so the effective length should only be 3 momentarily between when we
// decode and when we copy into the target buffer.
debug_assert!(self.decoded_len < DECODED_CHUNK_SIZE);
debug_assert!(self.decoded_len + self.decoded_offset <= DECODED_CHUNK_SIZE);

if self.decoded_len > 0 {
// we have a few leftover decoded bytes; flush that rather than pull in more b64
self.flush_decoded_buf(buf)
} else {
let mut at_eof = false;
while self.b64_len < BASE64_CHUNK_SIZE {
// Work around lack of copy_within, which is only present in 1.37
// Copy any bytes we have to the start of the buffer.
// We know we have < 1 chunk, so we can use a tiny tmp buffer.
let mut memmove_buf = [0_u8; BASE64_CHUNK_SIZE];
memmove_buf[..self.b64_len].copy_from_slice(
&self.b64_buffer[self.b64_offset..self.b64_offset + self.b64_len],
);
self.b64_buffer[0..self.b64_len].copy_from_slice(&memmove_buf[..self.b64_len]);
self.b64_offset = 0;

// then fill in more data
let read = self.read_from_delegate()?;
if read == 0 {
// we never pass in an empty buf, so 0 => we've hit EOF
at_eof = true;
break;
}
}

if self.b64_len == 0 {
debug_assert!(at_eof);
// we must be at EOF, and we have no data left to decode
return Ok(0);
};

debug_assert!(if at_eof {
// if we are at eof, we may not have a complete chunk
self.b64_len > 0
} else {
// otherwise, we must have at least one chunk
self.b64_len >= BASE64_CHUNK_SIZE
});

debug_assert_eq!(0, self.decoded_len);

if buf.len() < DECODED_CHUNK_SIZE {
// caller requested an annoyingly short read
// have to write to a tmp buf first to avoid double mutable borrow
let mut decoded_chunk = [0_u8; DECODED_CHUNK_SIZE];
// if we are at eof, could have less than BASE64_CHUNK_SIZE, in which case we have
// to assume that these last few tokens are, in fact, valid (i.e. must be 2-4 b64
// tokens, not 1, since 1 token can't decode to 1 byte).
let to_decode = cmp::min(self.b64_len, BASE64_CHUNK_SIZE);

let decoded = self.decode_to_buf(to_decode, &mut decoded_chunk[..])?;
self.decoded_buffer[..decoded].copy_from_slice(&decoded_chunk[..decoded]);

self.decoded_offset = 0;
self.decoded_len = decoded;

// can be less than 3 on last block due to padding
debug_assert!(decoded <= 3);

self.flush_decoded_buf(buf)
} else {
let b64_bytes_that_can_decode_into_buf = (buf.len() / DECODED_CHUNK_SIZE)
.checked_mul(BASE64_CHUNK_SIZE)
.expect("too many chunks");
debug_assert!(b64_bytes_that_can_decode_into_buf >= BASE64_CHUNK_SIZE);

let b64_bytes_available_to_decode = if at_eof {
self.b64_len
} else {
// only use complete chunks
self.b64_len - self.b64_len % 4
};

let actual_decode_len = cmp::min(
b64_bytes_that_can_decode_into_buf,
b64_bytes_available_to_decode,
);
self.decode_to_buf(actual_decode_len, buf)
}
}
}
}