Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup ChunkReader (#4118) #4156

Merged
merged 5 commits into from
May 2, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
39 changes: 22 additions & 17 deletions parquet/src/arrow/async_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -766,6 +766,25 @@ enum ColumnChunkData {
Dense { offset: usize, data: Bytes },
}

impl ColumnChunkData {
fn get(&self, start: u64) -> Result<Bytes> {
match &self {
ColumnChunkData::Sparse { data, .. } => data
.binary_search_by_key(&start, |(offset, _)| *offset as u64)
.map(|idx| data[idx].1.clone())
.map_err(|_| {
ParquetError::General(format!(
"Invalid offset in sparse column chunk data: {start}"
))
}),
ColumnChunkData::Dense { offset, data } => {
let start = start as usize - *offset;
Ok(data.slice(start..))
}
}
}
}

impl Length for ColumnChunkData {
fn len(&self) -> u64 {
match &self {
Expand All @@ -778,26 +797,12 @@ impl Length for ColumnChunkData {
impl ChunkReader for ColumnChunkData {
type T = bytes::buf::Reader<Bytes>;

fn get_read(&self, start: u64, length: usize) -> Result<Self::T> {
Ok(self.get_bytes(start, length)?.reader())
fn get_read(&self, start: u64) -> Result<Self::T> {
Ok(self.get(start)?.reader())
}

fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes> {
match &self {
ColumnChunkData::Sparse { data, .. } => data
.binary_search_by_key(&start, |(offset, _)| *offset as u64)
.map(|idx| data[idx].1.slice(0..length))
.map_err(|_| {
ParquetError::General(format!(
"Invalid offset in sparse column chunk data: {start}"
))
}),
ColumnChunkData::Dense { offset, data } => {
let start = start as usize - *offset;
let end = start + length;
Ok(data.slice(start..end))
}
}
Ok(self.get(start)?.slice(..length))
}
}

Expand Down
3 changes: 1 addition & 2 deletions parquet/src/bin/parquet-layout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,7 @@ fn read_page_header<C: ChunkReader>(
}
}

let len = reader.len().checked_sub(offset).unwrap() as usize;
let input = reader.get_read(offset, len)?;
let input = reader.get_read(offset)?;
let mut tracked = TrackedRead(input, 0);
let mut prot = TCompactInputProtocol::new(&mut tracked);
let header = PageHeader::read_from_in_protocol(&mut prot)?;
Expand Down
2 changes: 1 addition & 1 deletion parquet/src/file/footer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub fn parse_metadata<R: ChunkReader>(chunk_reader: &R) -> Result<ParquetMetaDat

let mut footer = [0_u8; 8];
chunk_reader
.get_read(file_size - 8, 8)?
.get_read(file_size - 8)?
.read_exact(&mut footer)?;

let metadata_len = decode_footer(&footer)?;
Expand Down
4 changes: 2 additions & 2 deletions parquet/src/file/page_index/index_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub fn read_columns_indexes<R: ChunkReader>(
}

//read all need data into buffer
let mut reader = reader.get_read(offset, length)?;
let mut reader = reader.get_read(offset)?;
let mut data = vec![0; length];
reader.read_exact(&mut data)?;

Expand Down Expand Up @@ -93,7 +93,7 @@ pub fn read_pages_locations<R: ChunkReader>(
}

//read all need data into buffer
let mut reader = reader.get_read(offset, total_length)?;
let mut reader = reader.get_read(offset)?;
let mut data = vec![0; total_length];
reader.read_exact(&mut data)?;

Expand Down
10 changes: 6 additions & 4 deletions parquet/src/file/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,17 @@ pub trait Length {
/// For an object store reader, each read can be mapped to a range request.
pub trait ChunkReader: Length + Send + Sync {
type T: Read + Send;
/// Get a serially readable slice of the current reader
/// This should fail if the slice exceeds the current bounds
fn get_read(&self, start: u64, length: usize) -> Result<Self::T>;
/// Get a [`Read`] starting at the provided file offset
fn get_read(&self, start: u64) -> Result<Self::T>;

/// Get a range as bytes
/// This should fail if the exact number of bytes cannot be read
fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes> {
let mut buffer = Vec::with_capacity(length);
let read = self.get_read(start, length)?.read_to_end(&mut buffer)?;
let read = self
.get_read(start)?
.take(length as _)
.read_to_end(&mut buffer)?;

if read != length {
return Err(eof_err!(
Expand Down
17 changes: 8 additions & 9 deletions parquet/src/file/serialized_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ impl TryClone for File {
impl ChunkReader for File {
type T = FileSource<File>;

fn get_read(&self, start: u64, length: usize) -> Result<Self::T> {
Ok(FileSource::new(self, start, length))
fn get_read(&self, start: u64) -> Result<Self::T> {
Ok(FileSource::new(self, start))
}
}

Expand All @@ -85,8 +85,9 @@ impl TryClone for Bytes {
impl ChunkReader for Bytes {
type T = bytes::buf::Reader<Bytes>;

fn get_read(&self, start: u64, length: usize) -> Result<Self::T> {
Ok(self.get_bytes(start, length)?.reader())
fn get_read(&self, start: u64) -> Result<Self::T> {
let start = start as usize;
Ok(self.slice(start..).reader())
}

fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes> {
Expand Down Expand Up @@ -662,7 +663,7 @@ impl<R: ChunkReader> PageReader for SerializedPageReader<R> {
return Ok(None);
}

let mut read = self.reader.get_read(*offset as u64, *remaining)?;
let mut read = self.reader.get_read(*offset as u64)?;
let header = if let Some(header) = next_page_header.take() {
*header
} else {
Expand Down Expand Up @@ -752,8 +753,7 @@ impl<R: ChunkReader> PageReader for SerializedPageReader<R> {
continue;
}
} else {
let mut read =
self.reader.get_read(*offset as u64, *remaining_bytes)?;
let mut read = self.reader.get_read(*offset as u64)?;
let (header_len, header) = read_page_header_len(&mut read)?;
*offset += header_len;
*remaining_bytes -= header_len;
Expand Down Expand Up @@ -807,8 +807,7 @@ impl<R: ChunkReader> PageReader for SerializedPageReader<R> {
*offset += buffered_header.compressed_page_size as usize;
*remaining_bytes -= buffered_header.compressed_page_size as usize;
} else {
let mut read =
self.reader.get_read(*offset as u64, *remaining_bytes)?;
let mut read = self.reader.get_read(*offset as u64)?;
let (header_len, header) = read_page_header_len(&mut read)?;
let data_page_size = header.compressed_page_size as usize;
*offset += header_len + data_page_size;
Expand Down
40 changes: 15 additions & 25 deletions parquet/src/util/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ impl<T: Read + Seek + Length + TryClone> ParquetReader for T {}
pub struct FileSource<R: ParquetReader> {
reader: RefCell<R>,
start: u64, // start position in a file
end: u64, // end position in a file
buf: Vec<u8>, // buffer where bytes read in advance are stored
buf_pos: usize, // current position of the reader in the buffer
buf_cap: usize, // current number of bytes read into the buffer
Expand All @@ -60,7 +59,6 @@ impl<R: ParquetReader> fmt::Debug for FileSource<R> {
f.debug_struct("FileSource")
.field("reader", &"OPAQUE")
.field("start", &self.start)
.field("end", &self.end)
.field("buf.len", &self.buf.len())
.field("buf_pos", &self.buf_pos)
.field("buf_cap", &self.buf_cap)
Expand All @@ -70,12 +68,11 @@ impl<R: ParquetReader> fmt::Debug for FileSource<R> {

impl<R: ParquetReader> FileSource<R> {
/// Creates new file reader with start and length from a file handle
pub fn new(fd: &R, start: u64, length: usize) -> Self {
pub fn new(fd: &R, start: u64) -> Self {
let reader = RefCell::new(fd.try_clone().unwrap());
Self {
reader,
start,
end: start + length as u64,
buf: vec![0_u8; DEFAULT_BUF_SIZE],
buf_pos: 0,
buf_cap: 0,
Expand Down Expand Up @@ -112,9 +109,6 @@ impl<R: ParquetReader> FileSource<R> {

impl<R: ParquetReader> Read for FileSource<R> {
fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
let bytes_to_read = cmp::min(buf.len(), (self.end - self.start) as usize);
let buf = &mut buf[0..bytes_to_read];
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could actually benefit performance, as it will avoid artificially truncating the input buffer, causing it to not skip the FileSource buffer when it could


// If we don't have any buffered data and we're doing a massive read
// (larger than our internal buffer), bypass our internal buffer
// entirely.
Expand All @@ -134,12 +128,6 @@ impl<R: ParquetReader> Read for FileSource<R> {
}
}

impl<R: ParquetReader> Length for FileSource<R> {
fn len(&self) -> u64 {
self.end - self.start
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand All @@ -151,7 +139,8 @@ mod tests {
#[test]
fn test_io_read_fully() {
let mut buf = vec![0; 8];
let mut src = FileSource::new(&get_test_file("alltypes_plain.parquet"), 0, 4);
let mut src =
FileSource::new(&get_test_file("alltypes_plain.parquet"), 0).take(4);

let bytes_read = src.read(&mut buf[..]).unwrap();
assert_eq!(bytes_read, 4);
Expand All @@ -161,7 +150,8 @@ mod tests {
#[test]
fn test_io_read_in_chunks() {
let mut buf = vec![0; 4];
let mut src = FileSource::new(&get_test_file("alltypes_plain.parquet"), 0, 4);
let mut src =
FileSource::new(&get_test_file("alltypes_plain.parquet"), 0).take(4);

let bytes_read = src.read(&mut buf[0..2]).unwrap();
assert_eq!(bytes_read, 2);
Expand All @@ -172,34 +162,34 @@ mod tests {

#[test]
fn test_io_read_pos() {
let mut src = FileSource::new(&get_test_file("alltypes_plain.parquet"), 0, 4);
let mut src =
FileSource::new(&get_test_file("alltypes_plain.parquet"), 0).take(4);

let _ = src.read(&mut [0; 1]).unwrap();
assert_eq!(src.start, 1);
let read = src.read(&mut [0; 1]).unwrap();
assert_eq!(read, 1);

let _ = src.read(&mut [0; 4]).unwrap();
assert_eq!(src.start, 4);
let read = src.read(&mut [0; 4]).unwrap();
assert_eq!(read, 3);
}

#[test]
fn test_io_read_over_limit() {
let mut src = FileSource::new(&get_test_file("alltypes_plain.parquet"), 0, 4);
let mut src =
FileSource::new(&get_test_file("alltypes_plain.parquet"), 0).take(4);

// Read all bytes from source
let _ = src.read(&mut [0; 128]).unwrap();
assert_eq!(src.start, 4);

// Try reading again, should return 0 bytes.
let bytes_read = src.read(&mut [0; 128]).unwrap();
assert_eq!(bytes_read, 0);
assert_eq!(src.start, 4);
}

#[test]
fn test_io_seek_switch() {
let mut buf = vec![0; 4];
let mut file = get_test_file("alltypes_plain.parquet");
let mut src = FileSource::new(&file, 0, 4);
let mut src = FileSource::new(&file, 0).take(4);

file.seek(SeekFrom::Start(5_u64))
.expect("File seek to a position");
Expand All @@ -224,7 +214,7 @@ mod tests {
file.seek(SeekFrom::Start(3)).unwrap();

// create the FileSource reader that starts at pos 1 ('b')
let mut chunk = FileSource::new(&file, 1, patterned_data.len() - 1);
let mut chunk = FileSource::new(&file, 1);

// read the 'b' at pos 1
let mut res = vec![0u8; 1];
Expand Down