Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow usage without Seek: RevReader approach #36

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
220 changes: 192 additions & 28 deletions src/reading.rs
Original file line number Diff line number Diff line change
Expand Up @@ -528,19 +528,167 @@ impl BasePacketReader {
}
}

// TODO replace 'a with GAT as soon as they are stabilized
est31 marked this conversation as resolved.
Show resolved Hide resolved
// (https://github.com/rust-lang/rust/issues/44265)

/// A trait for [Read] implementations that allow reversible reads to a buffer.
/// After such a read, the reader's position can be reverted as with [Seek],
/// however only as long as it is still within the bounds of the previous
/// reversible read call. As such, this trait is to be viewed as a subset of
/// [Seek]. The lifetime `'a` is that of the reference passed to the object in
/// [RevRead::rev_read] in order to support depending [RevRead::Buf] on that
/// lifetime.
///
/// Usually, you will not need to implement this yourself. Instead, you can
/// rely either on the blanket implementation provided for all type that
est31 marked this conversation as resolved.
Show resolved Hide resolved
/// implement [Read] and [Seek] or use the [RevReader] to wrap a type that only
est31 marked this conversation as resolved.
Show resolved Hide resolved
/// implements [Read].
pub trait RevRead<'a>: Read {

/// The type of buffer returned by this reversible reader.
type Buf: AsRef<[u8]>;

/// Executes a reversible read hat returns a buffer with data read from the
/// current position in the reader. The buffer must be larger than 28 bytes
/// in size, but it does not have to be completely filled with data from
/// the reader. The second return value indicates the actual numbe of bytes
/// read from the reader.
///
/// # Arguments
///
/// * `amount`: An upper bound on the number of bytes that should be read
/// from the reader. This does not have to be an upper bound on the size of
/// the returned buffer.
///
/// # Errors
///
/// If reading fails according to [Read::read].
fn rev_read(&'a mut self, amount: usize) -> io::Result<(Self::Buf, usize)>;

/// Reverts the previous execution of [RevRead::rev_read] by some amount,
/// i.e. the next read will yield previous data of that size again as if
/// [Seek::seek] was called with [SeekFrom::Current] of `-amount`. It is a
/// prerequisite to this function that the previous operation was
/// [RevRead::rev_read] and that that returned at least one byte of data,
/// otherwise reverting may or may not work.
///
/// # Arguments
///
/// * `amount`: The number of bytes by which to revert the previous read.
est31 marked this conversation as resolved.
Show resolved Hide resolved
///
/// # Errors
///
/// If reverting by the given amount is unsupported given the current
/// situation.
fn revert(&mut self, amount: usize) -> io::Result<()>;
}

// The array's size is freely choosable, but must be > 27.
const REV_READ_BUF_SIZE: usize = 1024;

impl<'a, R: Read + Seek> RevRead<'a> for R {
type Buf = [u8; REV_READ_BUF_SIZE];

fn rev_read(&'a mut self, amount: usize) -> io::Result<(Self::Buf, usize)> {
let amount = amount.min(REV_READ_BUF_SIZE);
let mut buf = [0; REV_READ_BUF_SIZE];
let count = self.read(&mut buf[..amount])?;

Ok((buf, count))
}

fn revert(&mut self, amount: usize) -> io::Result<()> {
self.seek(SeekFrom::Current(-(amount as i64)))?;

Ok(())
}
}

/// A reversible reader that wraps around a type implementing [Read] and offers
/// a [RevRead] implementation by storing read data in a local buffer. Use this
/// if you want to use the [PacketReader] with a type that does not implement
/// [Seek].
pub struct RevReader<R> {
/// The underlying reader.
read: R,
/// The buffer used to store the result of a [RevRead::rev_read] call.
buf: Box<[u8; REV_READ_BUF_SIZE]>,
/// The number of bytes in the buffer that are valid.
buf_len: usize,
/// The index of the next byte that should be read from the buffer.
buf_idx: usize
}

impl<R> RevReader<R> {

/// Creates a new reversible reader that wraps around the given `read`.
/// Note in order for the [RevRead] implementation to hold, `read` must
/// implement [Read].
pub fn new(read: R) -> RevReader<R> {
RevReader {
read,
buf: Box::new([0; REV_READ_BUF_SIZE]),
buf_len: 0,
buf_idx: 0
}
}
est31 marked this conversation as resolved.
Show resolved Hide resolved
}

impl<R: Read> Read for RevReader<R> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
if self.buf_idx < self.buf_len {
// We are still within reverted amount => copy from internal buffer
let read_len = (self.buf_len - self.buf_idx).min(buf.len());
let read_data = &self.buf[self.buf_idx..(self.buf_idx + read_len)];

(&mut buf[..read_len]).copy_from_slice(read_data);
self.buf_idx += read_len;
Ok(read_len)
}
else {
est31 marked this conversation as resolved.
Show resolved Hide resolved
// Invalidate the buffer
self.buf_len = 0;
self.read.read(buf)
}
}
}

impl<'a, R: Read> RevRead<'a> for RevReader<R> {
type Buf = &'a [u8];

fn rev_read(&'a mut self, amount: usize) -> io::Result<(Self::Buf, usize)> {
let amount = amount.min(self.buf.len());
let buf = &mut self.buf[..amount];
let count = self.read.read(buf)?;

self.buf_len = count;
self.buf_idx = count;
Ok((&*self.buf, count))
}

fn revert(&mut self, amount: usize) -> io::Result<()> {
if amount > self.buf_idx || self.buf_len == 0 {
return Err(io::Error::new(ErrorKind::Other, "invalid revert"));
}

self.buf_idx -= amount;
Ok(())
}
}

#[derive(Clone, Copy)]
enum UntilPageHeaderReaderMode {
Searching,
FoundWithNeeded(u8),
SeekNeeded(i32),
RevertNeeded(usize),
Found,
}

enum UntilPageHeaderResult {
Eof,
Found,
ReadNeeded,
SeekNeeded,
RevertNeeded,
}

struct UntilPageHeaderReader {
Expand Down Expand Up @@ -583,27 +731,28 @@ impl UntilPageHeaderReader {
/// return Ok(true) if the full header has been read and can be extracted with
///
/// or return Ok(false) if the
pub fn do_read<R :Read>(&mut self, mut rdr :R)
-> Result<UntilPageHeaderResult, OggReadError> {
pub fn do_read<R>(&mut self, rdr :&mut R)
-> Result<UntilPageHeaderResult, OggReadError>
where
for<'a> R: RevRead<'a>
{
use self::UntilPageHeaderReaderMode::*;
use self::UntilPageHeaderResult as Res;
// The array's size is freely choosable, but must be > 27,
// and must well fit into an i32 (needs to be stored in SeekNeeded)
let mut buf :[u8; 1024] = [0; 1024];

let rd_len = tri!(rdr.read(if self.read_amount < 27 {
let rd_amount = if self.read_amount < 27 {
// This is an optimisation for the most likely case:
// the next page directly follows the current read position.
// Then it would be a waste to read more than the needed amount.
&mut buf[0 .. 27 - self.read_amount]
27 - self.read_amount
} else {
match self.mode {
Searching => &mut buf,
FoundWithNeeded(amount) => &mut buf[0 .. amount as usize],
SeekNeeded(_) => return Ok(Res::SeekNeeded),
Searching => usize::MAX,
FoundWithNeeded(amount) => amount as usize,
RevertNeeded(_) => return Ok(Res::RevertNeeded),
Found => return Ok(Res::Found),
}
}));
};
let (buf, rd_len) = tri!(rdr.rev_read(rd_amount));
let buf = buf.as_ref();

if rd_len == 0 {
// Reached EOF. This means we're in one of these cases:
Expand Down Expand Up @@ -678,18 +827,21 @@ impl UntilPageHeaderReader {
// Seek back so that we are at the position
// right after the header.

self.mode = SeekNeeded(needed as i32 - fnd_buf.len() as i32);
return Ok(Res::SeekNeeded);
self.mode = RevertNeeded(fnd_buf.len() - needed);
return Ok(Res::RevertNeeded);
}
}
pub fn do_seek<S :Seek>(&mut self, mut skr :S)
-> Result<UntilPageHeaderResult, OggReadError> {
pub fn do_revert<R>(&mut self, rdr :&mut R)
-> Result<UntilPageHeaderResult, OggReadError>
where
for<'a> R: RevRead<'a>
{
use self::UntilPageHeaderReaderMode::*;
use self::UntilPageHeaderResult as Res;
match self.mode {
Searching | FoundWithNeeded(_) => Ok(Res::ReadNeeded),
SeekNeeded(offs) => {
tri!(skr.seek(SeekFrom::Current(offs as i64)));
RevertNeeded(amount) => {
tri!(rdr.revert(amount));
self.mode = Found;
Ok(Res::Found)
},
Expand All @@ -715,15 +867,21 @@ consistent when it encounters the `WouldBlock` error kind.
If you desire async functionality, consider enabling the `async` feature
and look into the async module.
*/
pub struct PacketReader<T :io::Read + io::Seek> {
pub struct PacketReader<T>
where
for<'a> T: RevRead<'a>
{
rdr :T,

base_pck_rdr :BasePacketReader,

read_some_pg :bool
}

impl<T :io::Read + io::Seek> PacketReader<T> {
impl<T> PacketReader<T>
where
for<'a> T: RevRead<'a>
{
/// Constructs a new `PacketReader` with a given `Read`.
pub fn new(rdr :T) -> PacketReader<T> {
PacketReader { rdr, base_pck_rdr : BasePacketReader::new(), read_some_pg : false }
Expand Down Expand Up @@ -785,7 +943,7 @@ impl<T :io::Read + io::Seek> PacketReader<T> {
break
},
ReadNeeded => tri!(r.do_read(&mut self.rdr)),
SeekNeeded => tri!(r.do_seek(&mut self.rdr))
RevertNeeded => tri!(r.do_revert(&mut self.rdr))
}
}
Ok(Some(r.into_header()))
Expand Down Expand Up @@ -815,6 +973,17 @@ impl<T :io::Read + io::Seek> PacketReader<T> {
Ok(Some(tri!(pg_prs.parse_packet_data(packet_data))))
}

/// Resets the internal state by deleting all
/// unread packets.
pub fn delete_unread_packets(&mut self) {
self.base_pck_rdr.update_after_seek();
}
}

impl<T> PacketReader<T>
where
for<'a> T: RevRead<'a> + io::Seek
{
/// Seeks the underlying reader
///
/// Seeks the reader that this PacketReader bases on by the specified
Expand Down Expand Up @@ -1015,11 +1184,6 @@ impl<T :io::Read + io::Seek> PacketReader<T> {
}
}
}
/// Resets the internal state by deleting all
/// unread packets.
pub fn delete_unread_packets(&mut self) {
self.base_pck_rdr.update_after_seek();
}
}

// util function
Expand Down